Design and implement three solutions of the Producer-Consumer problem – the first version using NO synchronization at all, another one using semaphores for synchronization of critical sections between the producer and consumer, and the third version that includes a mutex for mutual exclusion of shared data values among threads.

The final program will create N number of producer and consumer threads. The job of the producer will be to generate a random number and place it in a bound-buffer. The role of the consumer will be to remove items from the bound-buffer and print them to the screen. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer. Sounds simple right? Well, yes and no. We must address 2 primary issues here: one is concurrency issues between the producer and the consumer, and the other is synchronization among the threads when accessing shared data, so we’ll address each of these issues independently by writing our code incrementally.

Here’s how the story unfolds...

The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer. The producer's job is to generate a piece of data, put it into the buffer and start again. At the same time the consumer is consuming the data (i.e. removing it from the buffer) one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.

Solution 1

Our first solution will be simple and use a spin lock, which is just continuously checking for some condition to be satisfied. The Producer thread should generate a random number. It should check to see if there is room in the buffer. If so, it should store it in the buffer and display it on standard output, then increment the buffer count. The Consumer should check if the buffer is empty. If not, then remove the next item from the buffer and print the value on standard output, then decrement the counter.

If the buffer is full, the producer will “spin” waiting for the consumer to remove an item. In the same way, the consumer will spin if it finds the buffer to be empty.

int itemCount
procedure producer() { while (true) { item = produceItem() //generate the random number while (itemCount == BUFFER_SIZE); //spin if buffer is full putItemIntoBuffer(item) //once buffer has room, //add the item to the buffer itemCount = itemCount + 1 } }
procedure consumer() { while (true) { while (itemCount == 0); //spin if nothing in buffer item = removeItemFromBuffer() //once buffer has an item, //remove the item from buffer itemCount = itemCount - 1 consumeItem(item) } }

What are the issues with this solution? Write them up as part of Solution 1.

Solution 2

We need a more efficient way of synchronizing the Producer and Consumer processes. We will introduce a semaphore . The semaphore provides an automatic way to communicate between the proceses so they can simply wait or sleep until some condition is met.

We will use two semaphores, fillCount and emptyCount, to solve the problem. fillCount is incremented and emptyCount decremented when a new item has been put into the buffer. If the producer tries to decrement emptyCount while its value is zero, the producer is put to sleep. The next time an item is consumed, emptyCount is incremented and the producer wakes up. The consumer works analogously.

semaphore fillCount = 0 semaphore emptyCount = BUFFER_SIZE procedure producer() { while (true) { item = produceItem() //generate the random number down(emptyCount) //checks if OK to enter //critical section and //decrements empty counter putItemIntoBuffer(item) //add item to buffer up(fillCount) //notifies left critical //section and increment //fill counter } }
procedure consumer() { while (true) { down(fillCount) //checks if OK to enter //critical section and //decrements fill counter item = removeItemFromBuffer() //remove item from buffer up(emptyCount) //notify left critical //section and increment //empty counter consumeItem(item) //remove item from buffer } }

The operation down (also known as wait) tells the system that we are about to enter a critical section and up (also known as signal) notifies that we have left the critical section.

What are the issues with this solution? Write them up as part of Solution 2.

Solution 3

What if we have multiple producers and consumers? We need a way to make sure that only one producer is executing putItemIntoBuffer() at a time. In other words we need a way to execute a critical section with mutual exclusion. To accomplish this we use a binary semaphore called mutex. Since the value of a binary semaphore can be only either one or zero, only one process can be executing between pthread_mutex_lock and pthread_mutex_unlock. The solution for multiple producers and consumers is shown below.

pthread_mutex_t mutex = 1 semaphore fillCount = 0 semaphore emptyCount = BUFFER_SIZE
procedure producer() { while (true) { item = produceItem() down(emptyCount) pthread_mutex_lock(&mutex) putItemIntoBuffer(item) pthread_mutex_unlock(&mutex) up(fillCount) } up(fillCount) //the consumer may not finish before the producer. }
procedure consumer() { while (true) { down(fillCount) pthread_mutex_lock(&mutex) item = removeItemFromBuffer() pthread_mutex_unlock(&mutex) up(emptyCount) consumeItem(item) } }

Mutexes

Mutex – For mutual exclusion while performing actual operation on shared memory.

Semaphores

Empty – For counting number of empty slots and blocking consumers if 0 empty slots. Full – For counting full slots and blocking producers when buffer is full.

Semaphores in the pthread package

The code below illustrates how a semaphore is created:

#include sem_t sem; /* create the semaphore and initialize it to 5 */ sem_init(&sem, 0, BUFFER_SIZE);

The sem_init(…) creates a semaphore and initialize it. This function is passed three parameters:

  • A pointer to the semaphore
  • A flag indicating the level of sharing 5
  • The semaphore’s initial value

In this example, by passing the flag 0, we are indicating that this semaphore can only be shared by threads belonging to the same process that created the semaphore. A nonzero value would allow other processes to access the semaphore as well. In this example, we initialize the semaphore to the value 5.

For the semaphore operations wait (or down) and signal (or up), the pthread package names them sem_wait(…) and sem_post(…), respectively. The code example below creates a binary semaphore mutex with an initial value of 1 and illustrates its use in protecting a critical section: (Note: The code below is only for illustration purposes. Do not use this binary semaphore for protecting critical section. Instead, you are required to use the mutex locks provided by the pthread package for protecting critical section.)

#include sem_t sem_mutex; /* create the semaphore */ sem_init(&sem_mutex, 0, 1); /* acquire the semaphore */ sem_wait(&sem_mutex); /*** critical section ***/ /* release the semaphore */ sem_post(&sem_mutex);

Compilation

You need to link two special libraries to provide multithreaded and semaphore support - the pthread library for any program that uses pthreads and the runtime library rt for any program that uses semaphores. So the compile command for a program (call it prodcon.c) would be

gcc -o prodcon prodcon.c -lpthread -lrt

Test

You can start with one producer thread and one consumer thread for testing, and gradually use more producer and consumer threads. For each test case, you need to make sure that the random numbers generated by producer threads should exactly match the random numbers consumed by consumer threads (both their orders and their values).

Academic Honesty!
It is not our intention to break the school's academic policy. Posted solutions are meant to be used as a reference and should not be submitted as is. We are not held liable for any misuse of the solutions. Please see the frequently asked questions page for further questions and inquiries.
Kindly complete the form. Please provide a valid email address and we will get back to you within 24 hours. Payment is through PayPal, Buy me a Coffee or Cryptocurrency. We are a nonprofit organization however we need funds to keep this organization operating and to be able to complete our research and development projects.