20170206:TP:C:Threads:ProducerConsumer

De wiki-prog
Révision de 18 janvier 2018 à 14:18 par Slashvar (discuter | contributions) (Shared queue)

(diff) ← Version précédente | Voir la version courante (diff) | Version suivante → (diff)
Aller à : navigation, rechercher


Introduction

This week we'll implement a shared queue using the producer/consumer model backed with a simple queue (circular linked list) and two semaphore.

The first part is the queue implementation with some tests and the second part is a basic echo server (using code from previous sessions) that use the queue to distribute incoming connexion to a set of pre-launched workers (threads).

A Shared Queue

Our goal is to implement a classical producer/consumer queue (or blocking queue.)

You must provide implementation for the following header (calm down, we'll do that step by step):

/* shared_queue.h : a producer/consumer queue */
 
# ifndef EPITA_IP_SHARED_QUEUE_H_
# define EPITA_IP_SHARED_QUEUE_H_
 
# include <semaphore.h>
# include <stdlib.h>
 
struct shared_queue {
  sem_t lock, size;
  struct queue *store;
};
 
/* new_shared_queue() allocate and initialize a new queue             */
struct shared_queue* new_shared_queue(void);
 
/* shared_queue_push(queue, val) add val to the queue                 *
 * notify waiting threads when done                                   */
void shared_queue_push(struct shared_queue *queue, int val);
 
/* shared_queue_pop(queue) pop a new value from the queue             *
 * blocks while the queue is empty                                    */
int shared_queue_pop(struct shared_queue *queue);
 
/* shared_queue_destroy(queue) destroy the queue                      *
 * free any remaining memory                                          */
void shared_queue_destroy(struct shared_queue *queue);
 
# endif /* EPITA_IP_SHARED_QUEUE_H_ */

Queue implementation

The first step is to implement a traditional queue. We'll use the classical simply linked circular list that worked like this:

You need a push operation:

  • Push (queue, x):
    • create node new node q
    • q->val = x
    • if queue is empty (NULL):
      • q->next = q
    • else
      • q->next = queue->next
      • queue->next = q
    • queue = q

… and a pop operation:

  • Pop (queue):
    • DEBUG:verify that queue is not empty
    • node q = queue->next
    • result x = q->val
    • if q->next == q (only one element)
      • queue = NULL
    • else
      • queue->next = q->next
    • delete node q (free)
    • return x

You'll probably need a function that clear a queue (remove all element properly.)

Shared queue

Once you have a working queue, you need to apply the producer/consumer model. For that we use two semaphore: one for locking and one for the size of the queue (and thus blocking consumer on empty queue.)

You need the following functions (see the corresponding man pages):

int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_post(sem_t *sem);
int sem_destroy(sem_t *sem);

The locking semaphore will be initialized with a value of 1 and the size semaphore will be initialized with a value of 0. Consider the following example of initialization:

sem_t lock;
// Init with a value of 1
// first 0 is for unused option, keep it this way
if ( sem_init(&lock, 0, 1) == -1)
  err(1, "Fail to initialized semaphore");

The producer works as follow:

  • Produce(queue, val):
    • wait on lock semaphore
    • push val in the queue
    • post on size semaphore
    • post on lock semaphore

… and the consumer:

  • Consume(queue, val):
    • wait on size semaphore
    • wait on lock semaphore
    • pop val from queue
    • post on lock semaphore
    • return val

We can consider that the destroy operation is always alone (no other thread will access the queue.)

  • Implement all operations from shared_queue.h

Testing the queue

The following code provides a stress test for our shared queue. All code for the queue is supposed to be implemented in shared_queue.c, here is the main file for testing:

/* testing shared queues */
 
# define _GNU_SOURCE
 
# include <assert.h>
# include <err.h>
# include <errno.h>
# include <pthread.h>
# include <stdio.h>
# include <stdlib.h>
# include <time.h>
 
# include "shared_queue.h"
 
# define ITER 500
 
const struct timespec sleeptime = {0, 10000};
 
struct tharg {
  unsigned id;
  struct shared_queue *queue;
  pthread_barrier_t *sync;
};
 
void* worker(void *arg_) {
  struct tharg *arg = arg_;
  struct shared_queue *queue = arg->queue;
  unsigned id = arg->id;
  pthread_barrier_wait(arg->sync);
  for (size_t i = 0; i < ITER; i++) {
    int x = random() % 100;
    printf("<%u> Pushed %d\n", id, x);
    shared_queue_push(queue, x);
    nanosleep(&sleeptime, NULL);
    x = shared_queue_pop(queue);
    printf("<%u> Poped %d\n", id, x);
  }
  pthread_exit(NULL);
}
 
int main(int argc, char *argv[]) {
  size_t nb_workers = 8;
  if (argc > 1)
    nb_workers = strtoul(argv[1], NULL, 10);
  pthread_barrier_t sync;
  pthread_barrier_init(&sync, NULL, nb_workers);
  // VLA are cool !
  pthread_t workers[nb_workers];
  struct tharg tharg[nb_workers];
  struct shared_queue *queue = new_shared_queue();
  for (size_t i = 0; i < nb_workers; i++) {
    tharg[i].sync = &sync;
    tharg[i].id = i;
    tharg[i].queue = queue;
    int thret = pthread_create(workers + i, NULL, worker, tharg + i);
    if (thret != 0) {
      errno = thret;
      err(1, "creating thread %zu failed", i);
    }
  }
  for (size_t i = 0; i < nb_workers; i++) {
    int joinret = pthread_join(workers[i], NULL);
    if (joinret != 0) {
      errno = joinret;
      err(1, "joining thread %zu failed", i);
    }
  }
  pthread_barrier_destroy(&sync);
  shared_queue_destroy(queue);
  return 0;
}

And Makefile to compile this correctly:

# Makefile
 
CC=gcc -pthread -fsanitize=address
CPPFLAGS=
CFLAGS= -Wall -Wextra -std=c99 -Og
LDFLAGS=
LDLIBS=
 
SRC= shared_queue.c test.c
OBJ= ${SRC:.c=.o}
DEP= ${SRC:.c=.d}
 
all: test
 
test: ${OBJ}
 
clean:
	rm -f ${OBJ}
	rm -f ${DEP}
	rm -f test
 
-include ${DEP}
 
# END

Server

The next step is to implement an echo server using our queue. You can take code for the server from 20160125:TP:C:TCP:SimpleServer.

The mode of the server is now as follow:

  • Create a shared queue
  • launch a bunch of workers (at least 2, and probably no more than 16 … )
  • Initialize and set-up server-side sockets
  • loop
    • accept new connexion fdcnx
    • push fdcnx in the share queue

Workers (threads) are running something like this:

  • get the queue from arguments
  • loop
    • pop fd from the queue
    • handle connexion (echo … )
    • close fd

Test code in previous section used for testing queue can be a helpful starting point.