Signed-off-by: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp> --- sheep/work.c | 65 ++++++++++++++++++++++++++++++++++++++++++++------------- sheep/work.h | 2 + 2 files changed, 52 insertions(+), 15 deletions(-)
diff --git a/sheep/work.c b/sheep/work.c index 5678223..b197bb2 100644 --- a/sheep/work.c +++ b/sheep/work.c @@ -33,7 +33,8 @@ #include "event.h" #include "trace/trace.h" -#define NR_WORKER_THREADS 4 +#define NR_MIN_WORKER_THREADS 4 +#define NR_MAX_WORKER_THREADS 16384 static int efd; int total_ordered_workers; @@ -43,11 +44,46 @@ enum wq_state { WQ_DEAD = (1U << 1), }; +static void *worker_routine(void *arg); + +static int create_worker_threads(struct worker_info *wi, size_t nr_threads) +{ + int ret; + + nr_threads = min(nr_threads, (size_t)NR_MAX_WORKER_THREADS); + + pthread_mutex_lock(&wi->startup_lock); + if (nr_threads <= wi->nr_threads) { + pthread_mutex_unlock(&wi->startup_lock); + return 0; + } + while (wi->nr_threads < nr_threads) { + wi->nr_threads++; + ret = pthread_create(&wi->worker_thread[wi->nr_threads - 1], + NULL, worker_routine, wi); + if (ret != 0) { + eprintf("failed to create worker thread: %m\n"); + wi->nr_threads--; + pthread_mutex_unlock(&wi->startup_lock); + return -1; + } + dprintf("create thread %s %zd\n", wi->name, wi->nr_threads); + } + pthread_mutex_unlock(&wi->startup_lock); + + return 0; +} + void queue_work(struct work_queue *q, struct work *work) { struct worker_info *wi = container_of(q, struct worker_info, q); pthread_mutex_lock(&wi->pending_lock); + wi->nr_pending++; + + if (!wi->ordered && wi->nr_threads < wi->nr_pending + wi->nr_running) + create_worker_threads(wi, wi->nr_threads * 2); + list_add_tail(&work->w_list, &wi->q.pending_list); pthread_mutex_unlock(&wi->pending_lock); @@ -103,19 +139,26 @@ static void *worker_routine(void *arg) /* started this thread */ pthread_mutex_unlock(&wi->startup_lock); + pthread_mutex_lock(&wi->pending_lock); + wi->nr_running++; + pthread_mutex_unlock(&wi->pending_lock); + while (!(wi->q.wq_state & WQ_DEAD)) { pthread_mutex_lock(&wi->pending_lock); retest: if (list_empty(&wi->q.pending_list)) { + wi->nr_running--; pthread_cond_wait(&wi->pending_cond, &wi->pending_lock); if (wi->q.wq_state & WQ_DEAD) { pthread_mutex_unlock(&wi->pending_lock); pthread_exit(NULL); } + wi->nr_running++; goto retest; } + wi->nr_pending--; work = list_first_entry(&wi->q.pending_list, struct work, w_list); @@ -169,14 +212,13 @@ struct work_queue *init_work_queue(const char *name, bool ordered) if (ret) return NULL; - nr = ordered ? 1 : NR_WORKER_THREADS; + nr = ordered ? 1 : NR_MAX_WORKER_THREADS; wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t)); if (!wi) return NULL; wi->name = name; wi->ordered = ordered; - wi->nr_threads = nr; INIT_LIST_HEAD(&wi->q.pending_list); INIT_LIST_HEAD(&wi->finished_list); @@ -187,17 +229,10 @@ struct work_queue *init_work_queue(const char *name, bool ordered) pthread_mutex_init(&wi->pending_lock, NULL); pthread_mutex_init(&wi->startup_lock, NULL); - pthread_mutex_lock(&wi->startup_lock); - for (i = 0; i < wi->nr_threads; i++) { - ret = pthread_create(&wi->worker_thread[i], NULL, - worker_routine, wi); - if (ret) { - eprintf("failed to create worker thread: %s\n", - strerror(ret)); - goto destroy_threads; - } - } - pthread_mutex_unlock(&wi->startup_lock); + nr = ordered ? 1 : NR_MIN_WORKER_THREADS; + ret = create_worker_threads(wi, nr); + if (ret < 0) + goto destroy_threads; list_add(&wi->worker_info_siblings, &worker_info_list); @@ -206,7 +241,7 @@ destroy_threads: wi->q.wq_state |= WQ_DEAD; pthread_mutex_unlock(&wi->startup_lock); - for (; i > 0; i--) { + for (i = wi->nr_threads; i > 0; i--) { pthread_join(wi->worker_thread[i - 1], NULL); eprintf("stopped worker thread #%d\n", i - 1); } diff --git a/sheep/work.h b/sheep/work.h index e57c6d0..e31b3a3 100644 --- a/sheep/work.h +++ b/sheep/work.h @@ -35,6 +35,8 @@ struct worker_info { pthread_mutex_t pending_lock; /* protected by pending_lock */ struct work_queue q; + size_t nr_pending; + size_t nr_running; pthread_mutex_t startup_lock; -- 1.7.2.5 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog