This is a preparation for a dynamic worker thread pool. Signed-off-by: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp> --- include/util.h | 2 +- lib/logger.c | 4 +- sheep/work.c | 126 +++++++++++++++++-------------------------------------- sheep/work.h | 3 +- 4 files changed, 44 insertions(+), 91 deletions(-)
diff --git a/include/util.h b/include/util.h index d0bf659..9197c76 100644 --- a/include/util.h +++ b/include/util.h @@ -38,7 +38,7 @@ #define notrace __attribute__((no_instrument_function)) -#define uninitialized_var(x) (x = x) +#define uninitialized_var(x) x = x static inline int before(uint32_t seq1, uint32_t seq2) { diff --git a/lib/logger.c b/lib/logger.c index aa1c012..7158b65 100644 --- a/lib/logger.c +++ b/lib/logger.c @@ -156,7 +156,7 @@ static notrace int log_vsnprintf(char *buff, size_t size, int prio, { char *p = buff; - if (worker_name && worker_idx) + if (worker_name && worker_idx >= 0) snprintf(p, size, "[%s %d] ", worker_name, worker_idx); else if (worker_name) snprintf(p, size, "[%s] ", worker_name); @@ -466,7 +466,7 @@ notrace void set_thread_name(const char *name, int idx) notrace void get_thread_name(char *name) { - if (worker_name && worker_idx) + if (worker_name && worker_idx >= 0) sprintf(name, "%s %d", worker_name, worker_idx); else if (worker_name) sprintf(name, "%s", worker_name); diff --git a/sheep/work.c b/sheep/work.c index 0ef169d..5678223 100644 --- a/sheep/work.c +++ b/sheep/work.c @@ -25,7 +25,6 @@ #include <sys/types.h> #include <sys/eventfd.h> #include <linux/types.h> -#include <urcu/uatomic.h> #include "list.h" #include "util.h" @@ -34,6 +33,8 @@ #include "event.h" #include "trace/trace.h" +#define NR_WORKER_THREADS 4 + static int efd; int total_ordered_workers; LIST_HEAD(worker_info_list); @@ -42,75 +43,15 @@ enum wq_state { WQ_DEAD = (1U << 1), }; -/* - * Short thread is created on demand and destroyed after serving the work for - * gateway or io requests, aiming to solve two problems: - * - * 1. timeout of IO requests from guests. With on-demand short threads, we - * guarantee that there is always one thread available to execute the - * request as soon as possible. - * 2. sheep halt for corner case that all gateway and io threads are executing - * local requests that ask for creation of another thread to execute the - * requests and sleep-wait for responses. - */ -struct short_work { - struct work *work; - struct worker_info *wi; -}; - -static void *run_short_thread(void *arg) -{ - struct short_work *sw = arg; - eventfd_t value = 1; - static unsigned long idx; - int err; - - /* Tell runtime to release resources after termination */ - err = pthread_detach(pthread_self()); - if (err) - panic("%s\n", strerror(err)); - - set_thread_name(sw->wi->name, uatomic_add_return(&idx, 1)); - - sw->work->fn(sw->work); - - pthread_mutex_lock(&sw->wi->finished_lock); - list_add_tail(&sw->work->w_list, &sw->wi->finished_list); - pthread_mutex_unlock(&sw->wi->finished_lock); - - eventfd_write(efd, value); - free(sw); - pthread_exit(NULL); -} - -static inline void create_short_thread(struct worker_info *wi, - struct work *work) -{ - pthread_t thread; - struct short_work *sw = xmalloc(sizeof *sw); - int err; - - sw->work = work; - sw->wi = wi; - - err = pthread_create(&thread, NULL, run_short_thread, sw); - if (err) - panic("%s\n", strerror(err)); - short_thread_begin(); -} - void queue_work(struct work_queue *q, struct work *work) { struct worker_info *wi = container_of(q, struct worker_info, q); - if (wi->ordered) { - pthread_mutex_lock(&wi->pending_lock); - list_add_tail(&work->w_list, &wi->q.pending_list); - pthread_mutex_unlock(&wi->pending_lock); + pthread_mutex_lock(&wi->pending_lock); + list_add_tail(&work->w_list, &wi->q.pending_list); + pthread_mutex_unlock(&wi->pending_lock); - pthread_cond_signal(&wi->pending_cond); - } else - create_short_thread(wi, work); + pthread_cond_signal(&wi->pending_cond); } static void bs_thread_request_done(int fd, int events, void *data) @@ -135,8 +76,6 @@ static void bs_thread_request_done(int fd, int events, void *data) list_del(&work->w_list); work->done(work); - if (!wi->ordered) - short_thread_end(); } } } @@ -146,8 +85,19 @@ static void *worker_routine(void *arg) struct worker_info *wi = arg; struct work *work; eventfd_t value = 1; + int i, uninitialized_var(idx); + + for (i = 0; i < wi->nr_threads; i++) { + if (wi->worker_thread[i] == pthread_self()) { + idx = i; + break; + } + } - set_thread_name(wi->name, 0); + if (wi->ordered) + set_thread_name(wi->name, -1); + else + set_thread_name(wi->name, idx); pthread_mutex_lock(&wi->startup_lock); /* started this thread */ @@ -212,44 +162,42 @@ static int init_eventfd(void) struct work_queue *init_work_queue(const char *name, bool ordered) { - int ret; + int i, ret, nr; struct worker_info *wi; ret = init_eventfd(); if (ret) return NULL; - wi = zalloc(sizeof(*wi)); + nr = ordered ? 1 : NR_WORKER_THREADS; + wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t)); if (!wi) return NULL; wi->name = name; wi->ordered = ordered; + wi->nr_threads = nr; + INIT_LIST_HEAD(&wi->q.pending_list); INIT_LIST_HEAD(&wi->finished_list); - pthread_mutex_init(&wi->finished_lock, NULL); - - if (ordered) { - INIT_LIST_HEAD(&wi->q.pending_list); + pthread_cond_init(&wi->pending_cond, NULL); - pthread_cond_init(&wi->pending_cond, NULL); - pthread_mutex_init(&wi->pending_lock, NULL); - pthread_mutex_init(&wi->startup_lock, NULL); - - pthread_mutex_lock(&wi->startup_lock); + pthread_mutex_init(&wi->finished_lock, NULL); + pthread_mutex_init(&wi->pending_lock, NULL); + pthread_mutex_init(&wi->startup_lock, NULL); - ret = pthread_create(&wi->worker_thread, NULL, worker_routine, - wi); + pthread_mutex_lock(&wi->startup_lock); + for (i = 0; i < wi->nr_threads; i++) { + ret = pthread_create(&wi->worker_thread[i], NULL, + worker_routine, wi); if (ret) { eprintf("failed to create worker thread: %s\n", strerror(ret)); goto destroy_threads; } - - pthread_mutex_unlock(&wi->startup_lock); - total_ordered_workers++; } + pthread_mutex_unlock(&wi->startup_lock); list_add(&wi->worker_info_siblings, &worker_info_list); @@ -258,8 +206,10 @@ destroy_threads: wi->q.wq_state |= WQ_DEAD; pthread_mutex_unlock(&wi->startup_lock); - pthread_join(wi->worker_thread, NULL); - eprintf("stopped worker thread\n"); + for (; i > 0; i--) { + pthread_join(wi->worker_thread[i - 1], NULL); + eprintf("stopped worker thread #%d\n", i - 1); + } /* destroy_cond_mutex: */ pthread_cond_destroy(&wi->pending_cond); @@ -273,12 +223,14 @@ destroy_threads: #ifdef COMPILE_UNUSED_CODE static void exit_work_queue(struct work_queue *q) { + int i; struct worker_info *wi = container_of(q, struct worker_info, q); q->wq_state |= WQ_DEAD; pthread_cond_broadcast(&wi->pending_cond); - pthread_join(wi->worker_thread, NULL); + for (i = 0; wi->worker_thread[i] && i < wi->nr_threads; i++) + pthread_join(wi->worker_thread[i], NULL); pthread_cond_destroy(&wi->pending_cond); pthread_mutex_destroy(&wi->pending_lock); diff --git a/sheep/work.h b/sheep/work.h index 4d45dd6..e57c6d0 100644 --- a/sheep/work.h +++ b/sheep/work.h @@ -38,7 +38,8 @@ struct worker_info { pthread_mutex_t startup_lock; - pthread_t worker_thread; /* used for an ordered work queue */ + size_t nr_threads; + pthread_t worker_thread[0]; }; extern struct list_head worker_info_list; -- 1.7.2.5 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog