20180312:TP:C:Threads:ProducerConsumer
Sommaire
Introduction
This week we'll implement a shared queue using the producer/consumer model backed with a simple queue (circular linked list) and two semaphore.
The first part is the queue implementation with some tests and the second part is a basic echo server (using code from previous sessions) that use the queue to distribute incoming connexion to a set of pre-launched workers (threads).
Submission
The folder name for this session is 20180312_threads_prod_cons, the expected architecture is as follow:
20180312_threads_prod_cons AUTHORS queue: directory for the first part (A Shared Queue). server: directory for the second part (Server).
Provide only source files and Makefiles.
You should submit your work before Monday 19 March 2pm.
Our goal is to implement a classical producer/consumer queue (or blocking queue.)
You must provide implementation for the following header (calm down, we'll do that step by step):
/* shared_queue.h : a producer/consumer queue */ # ifndef EPITA_IP_SHARED_QUEUE_H_ # define EPITA_IP_SHARED_QUEUE_H_ # include <semaphore.h> # include <stdlib.h> struct shared_queue { sem_t lock, size; struct queue *store; }; /* new_shared_queue() allocate and initialize a new queue */ struct shared_queue* new_shared_queue(void); /* shared_queue_push(queue, val) add val to the queue * * notify waiting threads when done */ void shared_queue_push(struct shared_queue *queue, int val); /* shared_queue_pop(queue) pop a new value from the queue * * blocks while the queue is empty */ int shared_queue_pop(struct shared_queue *queue); /* shared_queue_destroy(queue) destroy the queue * * free any remaining memory */ void shared_queue_destroy(struct shared_queue *queue); # endif /* EPITA_IP_SHARED_QUEUE_H_ */
Queue implementation
The first step is to implement a traditional queue. We'll use the classical simply linked circular list that worked like this:
You need a push operation:
- Push (queue, x):
- create node new node q
- q->val = x
- if queue is empty (NULL):
- q->next = q
- else
- q->next = queue->next
- queue->next = q
- queue = q
… and a pop operation:
- Pop (queue):
- DEBUG:verify that queue is not empty
- node q = queue->next
- result x = q->val
- if q->next == q (only one element)
- queue = NULL
- else
- queue->next = q->next
- delete node q (free)
- return x
You'll probably need a function that clear a queue (remove all element properly.)
Once you have a working queue, you need to apply the producer/consumer model. For that we use two semaphore: one for locking and one for the size of the queue (and thus blocking consumer on empty queue.)
You need the following functions (see the corresponding man pages):
int sem_init(sem_t *sem, int pshared, unsigned int value); int sem_wait(sem_t *sem); int sem_post(sem_t *sem); int sem_destroy(sem_t *sem);
The locking semaphore will be initialized with a value of 1 and the size semaphore will be initialized with a value of 0. Consider the following example of initialization:
sem_t lock; // Init with a value of 1 // first 0 is for unused option, keep it this way if ( sem_init(&lock, 0, 1) == -1) err(1, "Fail to initialized semaphore");
The producer works as follow:
- Produce(queue, val):
- wait on lock semaphore
- push val in the queue
- post on size semaphore
- post on lock semaphore
… and the consumer:
- Consume(queue, val):
- wait on size semaphore
- wait on lock semaphore
- pop val from queue
- post on lock semaphore
- return val
We can consider that the destroy operation is always alone (no other thread will access the queue.)
- Implement all operations from shared_queue.h
Testing the queue
The following code provides a stress test for our shared queue. All code for the queue is supposed to be implemented in shared_queue.c, here is the main file for testing:
/* testing shared queues */ # define _GNU_SOURCE # include <assert.h> # include <err.h> # include <errno.h> # include <pthread.h> # include <stdio.h> # include <stdlib.h> # include <time.h> # include "shared_queue.h" # define ITER 500 const struct timespec sleeptime = {0, 10000}; struct tharg { unsigned id; struct shared_queue *queue; pthread_barrier_t *sync; }; void* worker(void *arg_) { struct tharg *arg = arg_; struct shared_queue *queue = arg->queue; unsigned id = arg->id; pthread_barrier_wait(arg->sync); for (size_t i = 0; i < ITER; i++) { int x = random() % 100; printf("<%u> Pushed %d\n", id, x); shared_queue_push(queue, x); nanosleep(&sleeptime, NULL); x = shared_queue_pop(queue); printf("<%u> Poped %d\n", id, x); } pthread_exit(NULL); } int main(int argc, char *argv[]) { size_t nb_workers = 8; if (argc > 1) nb_workers = strtoul(argv[1], NULL, 10); pthread_barrier_t sync; pthread_barrier_init(&sync, NULL, nb_workers); // VLA are cool ! pthread_t workers[nb_workers]; struct tharg tharg[nb_workers]; struct shared_queue *queue = new_shared_queue(); for (size_t i = 0; i < nb_workers; i++) { tharg[i].sync = &sync; tharg[i].id = i; tharg[i].queue = queue; int thret = pthread_create(workers + i, NULL, worker, tharg + i); if (thret != 0) { errno = thret; err(1, "creating thread %zu failed", i); } } for (size_t i = 0; i < nb_workers; i++) { int joinret = pthread_join(workers[i], NULL); if (joinret != 0) { errno = joinret; err(1, "joining thread %zu failed", i); } } pthread_barrier_destroy(&sync); shared_queue_destroy(queue); return 0; }
And Makefile to compile this correctly:
# Makefile CC=gcc -pthread -fsanitize=address CPPFLAGS= CFLAGS= -Wall -Wextra -std=c99 -Og LDFLAGS= LDLIBS= SRC= shared_queue.c test.c OBJ= ${SRC:.c=.o} DEP= ${SRC:.c=.d} all: test test: ${OBJ} clean: rm -f ${OBJ} rm -f ${DEP} rm -f test -include ${DEP} # END
Server
The next step is to implement an echo server using our queue. You can take code for the server from 20160125:TP:C:TCP:SimpleServer.
The mode of the server is now as follow:
- Create a shared queue
- launch a bunch of workers (at least 2, and probably no more than 16 … )
- Initialize and set-up server-side sockets
- loop
- accept new connexion fdcnx
- push fdcnx in the share queue
Workers (threads) are running something like this:
- get the queue from arguments
- loop
- pop fd from the queue
- handle connexion (echo … )
- close fd
Test code in previous section used for testing queue can be a helpful starting point.