Signed-off-by: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp>
---
 sheep/work.c |   65 ++++++++++++++++++++++++++++++++++++++++++++-------------
 sheep/work.h |    2 +
 2 files changed, 52 insertions(+), 15 deletions(-)

diff --git a/sheep/work.c b/sheep/work.c
index 5678223..b197bb2 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -33,7 +33,8 @@
 #include "event.h"
 #include "trace/trace.h"
 
-#define NR_WORKER_THREADS 4
+#define NR_MIN_WORKER_THREADS 4
+#define NR_MAX_WORKER_THREADS 16384
 
 static int efd;
 int total_ordered_workers;
@@ -43,11 +44,46 @@ enum wq_state {
        WQ_DEAD = (1U << 1),
 };
 
+static void *worker_routine(void *arg);
+
+static int create_worker_threads(struct worker_info *wi, size_t nr_threads)
+{
+       int ret;
+
+       nr_threads = min(nr_threads, (size_t)NR_MAX_WORKER_THREADS);
+
+       pthread_mutex_lock(&wi->startup_lock);
+       if (nr_threads <= wi->nr_threads) {
+               pthread_mutex_unlock(&wi->startup_lock);
+               return 0;
+       }
+       while (wi->nr_threads < nr_threads) {
+               wi->nr_threads++;
+               ret = pthread_create(&wi->worker_thread[wi->nr_threads - 1],
+                                    NULL, worker_routine, wi);
+               if (ret != 0) {
+                       eprintf("failed to create worker thread: %m\n");
+                       wi->nr_threads--;
+                       pthread_mutex_unlock(&wi->startup_lock);
+                       return -1;
+               }
+               dprintf("create thread %s %zd\n", wi->name, wi->nr_threads);
+       }
+       pthread_mutex_unlock(&wi->startup_lock);
+
+       return 0;
+}
+
 void queue_work(struct work_queue *q, struct work *work)
 {
        struct worker_info *wi = container_of(q, struct worker_info, q);
 
        pthread_mutex_lock(&wi->pending_lock);
+       wi->nr_pending++;
+
+       if (!wi->ordered && wi->nr_threads < wi->nr_pending + wi->nr_running)
+               create_worker_threads(wi, wi->nr_threads * 2);
+
        list_add_tail(&work->w_list, &wi->q.pending_list);
        pthread_mutex_unlock(&wi->pending_lock);
 
@@ -103,19 +139,26 @@ static void *worker_routine(void *arg)
        /* started this thread */
        pthread_mutex_unlock(&wi->startup_lock);
 
+       pthread_mutex_lock(&wi->pending_lock);
+       wi->nr_running++;
+       pthread_mutex_unlock(&wi->pending_lock);
+
        while (!(wi->q.wq_state & WQ_DEAD)) {
 
                pthread_mutex_lock(&wi->pending_lock);
 retest:
                if (list_empty(&wi->q.pending_list)) {
+                       wi->nr_running--;
                        pthread_cond_wait(&wi->pending_cond, &wi->pending_lock);
                        if (wi->q.wq_state & WQ_DEAD) {
                                pthread_mutex_unlock(&wi->pending_lock);
                                pthread_exit(NULL);
                        }
+                       wi->nr_running++;
                        goto retest;
                }
 
+               wi->nr_pending--;
                work = list_first_entry(&wi->q.pending_list,
                                       struct work, w_list);
 
@@ -169,14 +212,13 @@ struct work_queue *init_work_queue(const char *name, bool 
ordered)
        if (ret)
                return NULL;
 
-       nr = ordered ? 1 : NR_WORKER_THREADS;
+       nr = ordered ? 1 : NR_MAX_WORKER_THREADS;
        wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
        if (!wi)
                return NULL;
 
        wi->name = name;
        wi->ordered = ordered;
-       wi->nr_threads = nr;
 
        INIT_LIST_HEAD(&wi->q.pending_list);
        INIT_LIST_HEAD(&wi->finished_list);
@@ -187,17 +229,10 @@ struct work_queue *init_work_queue(const char *name, bool 
ordered)
        pthread_mutex_init(&wi->pending_lock, NULL);
        pthread_mutex_init(&wi->startup_lock, NULL);
 
-       pthread_mutex_lock(&wi->startup_lock);
-       for (i = 0; i < wi->nr_threads; i++) {
-               ret = pthread_create(&wi->worker_thread[i], NULL,
-                                    worker_routine, wi);
-               if (ret) {
-                       eprintf("failed to create worker thread: %s\n",
-                               strerror(ret));
-                       goto destroy_threads;
-               }
-       }
-       pthread_mutex_unlock(&wi->startup_lock);
+       nr = ordered ? 1 : NR_MIN_WORKER_THREADS;
+       ret = create_worker_threads(wi, nr);
+       if (ret < 0)
+               goto destroy_threads;
 
        list_add(&wi->worker_info_siblings, &worker_info_list);
 
@@ -206,7 +241,7 @@ destroy_threads:
 
        wi->q.wq_state |= WQ_DEAD;
        pthread_mutex_unlock(&wi->startup_lock);
-       for (; i > 0; i--) {
+       for (i = wi->nr_threads; i > 0; i--) {
                pthread_join(wi->worker_thread[i - 1], NULL);
                eprintf("stopped worker thread #%d\n", i - 1);
        }
diff --git a/sheep/work.h b/sheep/work.h
index e57c6d0..e31b3a3 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -35,6 +35,8 @@ struct worker_info {
        pthread_mutex_t pending_lock;
        /* protected by pending_lock */
        struct work_queue q;
+       size_t nr_pending;
+       size_t nr_running;
 
        pthread_mutex_t startup_lock;
 
-- 
1.7.2.5

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to