This is a preparation for a dynamic worker thread pool.

Signed-off-by: MORITA Kazutaka <morita.kazut...@lab.ntt.co.jp>
---
 include/util.h |    2 +-
 lib/logger.c   |    4 +-
 sheep/work.c   |  126 +++++++++++++++++--------------------------------------
 sheep/work.h   |    3 +-
 4 files changed, 44 insertions(+), 91 deletions(-)

diff --git a/include/util.h b/include/util.h
index d0bf659..9197c76 100644
--- a/include/util.h
+++ b/include/util.h
@@ -38,7 +38,7 @@
 
 #define notrace __attribute__((no_instrument_function))
 
-#define uninitialized_var(x) (x = x)
+#define uninitialized_var(x) x = x
 
 static inline int before(uint32_t seq1, uint32_t seq2)
 {
diff --git a/lib/logger.c b/lib/logger.c
index aa1c012..7158b65 100644
--- a/lib/logger.c
+++ b/lib/logger.c
@@ -156,7 +156,7 @@ static notrace int log_vsnprintf(char *buff, size_t size, 
int prio,
 {
        char *p = buff;
 
-       if (worker_name && worker_idx)
+       if (worker_name && worker_idx >= 0)
                snprintf(p, size, "[%s %d] ", worker_name, worker_idx);
        else if (worker_name)
                snprintf(p, size, "[%s] ", worker_name);
@@ -466,7 +466,7 @@ notrace void set_thread_name(const char *name, int idx)
 
 notrace void get_thread_name(char *name)
 {
-       if (worker_name && worker_idx)
+       if (worker_name && worker_idx >= 0)
                sprintf(name, "%s %d", worker_name, worker_idx);
        else if (worker_name)
                sprintf(name, "%s", worker_name);
diff --git a/sheep/work.c b/sheep/work.c
index 0ef169d..5678223 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -25,7 +25,6 @@
 #include <sys/types.h>
 #include <sys/eventfd.h>
 #include <linux/types.h>
-#include <urcu/uatomic.h>
 
 #include "list.h"
 #include "util.h"
@@ -34,6 +33,8 @@
 #include "event.h"
 #include "trace/trace.h"
 
+#define NR_WORKER_THREADS 4
+
 static int efd;
 int total_ordered_workers;
 LIST_HEAD(worker_info_list);
@@ -42,75 +43,15 @@ enum wq_state {
        WQ_DEAD = (1U << 1),
 };
 
-/*
- * Short thread is created on demand and destroyed after serving the work for
- * gateway or io requests, aiming to solve two problems:
- *
- *  1. timeout of IO requests from guests. With on-demand short threads, we
- *     guarantee that there is always one thread available to execute the
- *     request as soon as possible.
- *  2. sheep halt for corner case that all gateway and io threads are executing
- *     local requests that ask for creation of another thread to execute the
- *     requests and sleep-wait for responses.
- */
-struct short_work {
-       struct work *work;
-       struct worker_info *wi;
-};
-
-static void *run_short_thread(void *arg)
-{
-       struct short_work *sw = arg;
-       eventfd_t value = 1;
-       static unsigned long idx;
-       int err;
-
-       /* Tell runtime to release resources after termination */
-       err = pthread_detach(pthread_self());
-       if (err)
-               panic("%s\n", strerror(err));
-
-       set_thread_name(sw->wi->name, uatomic_add_return(&idx, 1));
-
-       sw->work->fn(sw->work);
-
-       pthread_mutex_lock(&sw->wi->finished_lock);
-       list_add_tail(&sw->work->w_list, &sw->wi->finished_list);
-       pthread_mutex_unlock(&sw->wi->finished_lock);
-
-       eventfd_write(efd, value);
-       free(sw);
-       pthread_exit(NULL);
-}
-
-static inline void create_short_thread(struct worker_info *wi,
-                                      struct work *work)
-{
-       pthread_t thread;
-       struct short_work *sw = xmalloc(sizeof *sw);
-       int err;
-
-       sw->work = work;
-       sw->wi = wi;
-
-       err = pthread_create(&thread, NULL, run_short_thread, sw);
-       if (err)
-               panic("%s\n", strerror(err));
-       short_thread_begin();
-}
-
 void queue_work(struct work_queue *q, struct work *work)
 {
        struct worker_info *wi = container_of(q, struct worker_info, q);
 
-       if (wi->ordered) {
-               pthread_mutex_lock(&wi->pending_lock);
-               list_add_tail(&work->w_list, &wi->q.pending_list);
-               pthread_mutex_unlock(&wi->pending_lock);
+       pthread_mutex_lock(&wi->pending_lock);
+       list_add_tail(&work->w_list, &wi->q.pending_list);
+       pthread_mutex_unlock(&wi->pending_lock);
 
-               pthread_cond_signal(&wi->pending_cond);
-       } else
-               create_short_thread(wi, work);
+       pthread_cond_signal(&wi->pending_cond);
 }
 
 static void bs_thread_request_done(int fd, int events, void *data)
@@ -135,8 +76,6 @@ static void bs_thread_request_done(int fd, int events, void 
*data)
                        list_del(&work->w_list);
 
                        work->done(work);
-                       if (!wi->ordered)
-                               short_thread_end();
                }
        }
 }
@@ -146,8 +85,19 @@ static void *worker_routine(void *arg)
        struct worker_info *wi = arg;
        struct work *work;
        eventfd_t value = 1;
+       int i, uninitialized_var(idx);
+
+       for (i = 0; i < wi->nr_threads; i++) {
+               if (wi->worker_thread[i] == pthread_self()) {
+                       idx = i;
+                       break;
+               }
+       }
 
-       set_thread_name(wi->name, 0);
+       if (wi->ordered)
+               set_thread_name(wi->name, -1);
+       else
+               set_thread_name(wi->name, idx);
 
        pthread_mutex_lock(&wi->startup_lock);
        /* started this thread */
@@ -212,44 +162,42 @@ static int init_eventfd(void)
 
 struct work_queue *init_work_queue(const char *name, bool ordered)
 {
-       int ret;
+       int i, ret, nr;
        struct worker_info *wi;
 
        ret = init_eventfd();
        if (ret)
                return NULL;
 
-       wi = zalloc(sizeof(*wi));
+       nr = ordered ? 1 : NR_WORKER_THREADS;
+       wi = zalloc(sizeof(*wi) + nr * sizeof(pthread_t));
        if (!wi)
                return NULL;
 
        wi->name = name;
        wi->ordered = ordered;
+       wi->nr_threads = nr;
 
+       INIT_LIST_HEAD(&wi->q.pending_list);
        INIT_LIST_HEAD(&wi->finished_list);
 
-       pthread_mutex_init(&wi->finished_lock, NULL);
-
-       if (ordered) {
-               INIT_LIST_HEAD(&wi->q.pending_list);
+       pthread_cond_init(&wi->pending_cond, NULL);
 
-               pthread_cond_init(&wi->pending_cond, NULL);
-               pthread_mutex_init(&wi->pending_lock, NULL);
-               pthread_mutex_init(&wi->startup_lock, NULL);
-
-               pthread_mutex_lock(&wi->startup_lock);
+       pthread_mutex_init(&wi->finished_lock, NULL);
+       pthread_mutex_init(&wi->pending_lock, NULL);
+       pthread_mutex_init(&wi->startup_lock, NULL);
 
-               ret = pthread_create(&wi->worker_thread, NULL, worker_routine,
-                                    wi);
+       pthread_mutex_lock(&wi->startup_lock);
+       for (i = 0; i < wi->nr_threads; i++) {
+               ret = pthread_create(&wi->worker_thread[i], NULL,
+                                    worker_routine, wi);
                if (ret) {
                        eprintf("failed to create worker thread: %s\n",
                                strerror(ret));
                        goto destroy_threads;
                }
-
-               pthread_mutex_unlock(&wi->startup_lock);
-               total_ordered_workers++;
        }
+       pthread_mutex_unlock(&wi->startup_lock);
 
        list_add(&wi->worker_info_siblings, &worker_info_list);
 
@@ -258,8 +206,10 @@ destroy_threads:
 
        wi->q.wq_state |= WQ_DEAD;
        pthread_mutex_unlock(&wi->startup_lock);
-       pthread_join(wi->worker_thread, NULL);
-       eprintf("stopped worker thread\n");
+       for (; i > 0; i--) {
+               pthread_join(wi->worker_thread[i - 1], NULL);
+               eprintf("stopped worker thread #%d\n", i - 1);
+       }
 
 /* destroy_cond_mutex: */
        pthread_cond_destroy(&wi->pending_cond);
@@ -273,12 +223,14 @@ destroy_threads:
 #ifdef COMPILE_UNUSED_CODE
 static void exit_work_queue(struct work_queue *q)
 {
+       int i;
        struct worker_info *wi = container_of(q, struct worker_info, q);
 
        q->wq_state |= WQ_DEAD;
        pthread_cond_broadcast(&wi->pending_cond);
 
-       pthread_join(wi->worker_thread, NULL);
+       for (i = 0; wi->worker_thread[i] && i < wi->nr_threads; i++)
+               pthread_join(wi->worker_thread[i], NULL);
 
        pthread_cond_destroy(&wi->pending_cond);
        pthread_mutex_destroy(&wi->pending_lock);
diff --git a/sheep/work.h b/sheep/work.h
index 4d45dd6..e57c6d0 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -38,7 +38,8 @@ struct worker_info {
 
        pthread_mutex_t startup_lock;
 
-       pthread_t worker_thread; /* used for an ordered work queue */
+       size_t nr_threads;
+       pthread_t worker_thread[0];
 };
 
 extern struct list_head worker_info_list;
-- 
1.7.2.5

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to