Repository: celix
Updated Branches:
  refs/heads/develop b56c47d14 -> 1e73e4d1c


CELIX-77: add threadpool support


Project: http://git-wip-us.apache.org/repos/asf/celix/repo
Commit: http://git-wip-us.apache.org/repos/asf/celix/commit/1e73e4d1
Tree: http://git-wip-us.apache.org/repos/asf/celix/tree/1e73e4d1
Diff: http://git-wip-us.apache.org/repos/asf/celix/diff/1e73e4d1

Branch: refs/heads/develop
Commit: 1e73e4d1cad60bb1af4cd9f31382af2cd22687db
Parents: b56c47d
Author: Bjoern Petri <[email protected]>
Authored: Fri Dec 11 11:56:34 2015 +0100
Committer: Bjoern Petri <[email protected]>
Committed: Fri Dec 11 11:56:34 2015 +0100

----------------------------------------------------------------------
 utils/CMakeLists.txt                    |  96 ++---
 utils/private/src/thpool.c              | 562 +++++++++++++++++++++++++++
 utils/private/test/thread_pool_test.cpp | 118 ++++++
 utils/public/docs/Design.md             |  52 +++
 utils/public/docs/FAQ.md                |  33 ++
 utils/public/docs/README.md             |  62 +++
 utils/public/include/thpool.h           | 164 ++++++++
 7 files changed, 1043 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt
index 93ff353..62a953f 100644
--- a/utils/CMakeLists.txt
+++ b/utils/CMakeLists.txt
@@ -24,28 +24,31 @@ if (UTILS)
     include_directories("private/include")
     include_directories("public/include")
     add_library(celix_utils SHARED 
-               private/src/array_list.c
-               public/include/array_list.h 
-               private/include/array_list_private.h 
+                private/src/array_list.c
+                public/include/array_list.h 
+                private/include/array_list_private.h 
 
-               private/src/hash_map.c
-               public/include/hash_map.h 
-               private/include/hash_map_private.h
+                private/src/hash_map.c
+                public/include/hash_map.h 
+                private/include/hash_map_private.h
 
-               private/src/linked_list.c
-               private/src/linked_list_iterator.c
-               public/include/linked_list.h
-               public/include/linked_list_iterator.h 
-               private/include/linked_list_private.h
+                private/src/linked_list.c
+                private/src/linked_list_iterator.c
+                public/include/linked_list.h
+                public/include/linked_list_iterator.h 
+                private/include/linked_list_private.h
 
-               public/include/exports.h
-               
-               private/src/celix_threads.c
-               public/include/celix_threads.h
-       )
+                public/include/exports.h
+                
+                private/src/celix_threads.c
+                public/include/celix_threads.h
+
+                private/src/thpool.c
+                public/include/thpool.h
+        )
     
     IF(UNIX)
-       target_link_libraries(celix_utils m pthread)
+        target_link_libraries(celix_utils m pthread)
     ENDIF(UNIX)
     
     install(TARGETS celix_utils DESTINATION lib COMPONENT framework)
@@ -54,34 +57,39 @@ if (UTILS)
     
     celix_subproject(UTILS-TESTS "Option to build the utilities library tests" 
"OFF")
 
-       if (ENABLE_TESTING AND UTILS-TESTS)
-               find_package(CppUTest REQUIRED)
+        if (ENABLE_TESTING AND UTILS-TESTS)
+            find_package(CppUTest REQUIRED)
+
+            include_directories(${CUNIT_INCLUDE_DIRS})
+            include_directories(${CPPUTEST_INCLUDE_DIR})
+            include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
+            include_directories("${PROJECT_SOURCE_DIR}/utils/private/include")
+            
+            add_executable(hash_map_test private/test/hash_map_test.cpp)
+            target_link_libraries(hash_map_test celix_utils 
${CPPUTEST_LIBRARY} pthread)
+            
+            add_executable(array_list_test private/test/array_list_test.cpp)
+            target_link_libraries(array_list_test celix_utils 
${CPPUTEST_LIBRARY} pthread)
+            
+            add_executable(celix_threads_test 
private/test/celix_threads_test.cpp)
+            target_link_libraries(celix_threads_test celix_utils 
${CPPUTEST_LIBRARY} ${CPPUTEST_EXT_LIBRARY} pthread)
+            add_executable(linked_list_test private/test/linked_list_test.cpp)
+            target_link_libraries(linked_list_test celix_utils 
${CPPUTEST_LIBRARY} pthread)
+            
+            add_executable(thread_pool_test private/test/thread_pool_test.cpp)
+            target_link_libraries(thread_pool_test celix_utils 
${CPPUTEST_LIBRARY} pthread) 
 
-           include_directories(${CUNIT_INCLUDE_DIRS})
-           include_directories(${CPPUTEST_INCLUDE_DIR})
-           include_directories("${PROJECT_SOURCE_DIR}/utils/public/include")
-           include_directories("${PROJECT_SOURCE_DIR}/utils/private/include")
-           
-           add_executable(hash_map_test private/test/hash_map_test.cpp)
-           target_link_libraries(hash_map_test celix_utils ${CPPUTEST_LIBRARY} 
pthread)
-           
-           add_executable(array_list_test private/test/array_list_test.cpp)
-           target_link_libraries(array_list_test celix_utils 
${CPPUTEST_LIBRARY} pthread)
-           
-               add_executable(celix_threads_test 
private/test/celix_threads_test.cpp)
-           target_link_libraries(celix_threads_test celix_utils 
${CPPUTEST_LIBRARY} ${CPPUTEST_EXT_LIBRARY} pthread)
-           
-           add_executable(linked_list_test private/test/linked_list_test.cpp)
-           target_link_libraries(linked_list_test celix_utils 
${CPPUTEST_LIBRARY} pthread)
-           
-               add_test(NAME run_array_list_test COMMAND array_list_test)
-               add_test(NAME run_hash_map_test COMMAND hash_map_test)
-               add_test(NAME run_celix_threads_test COMMAND celix_threads_test)
-               add_test(NAME run_linked_list_test COMMAND linked_list_test)
-               SETUP_TARGET_FOR_COVERAGE(array_list_test array_list_test 
${CMAKE_BINARY_DIR}/coverage/array_list_test/array_list_test)
-               SETUP_TARGET_FOR_COVERAGE(hash_map hash_map_test 
${CMAKE_BINARY_DIR}/coverage/hash_map_test/hash_map_test)
-               SETUP_TARGET_FOR_COVERAGE(celix_threads_test celix_threads_test 
${CMAKE_BINARY_DIR}/coverage/celix_threads_test/celix_threads_test)
-               SETUP_TARGET_FOR_COVERAGE(linked_list_test linked_list_test 
${CMAKE_BINARY_DIR}/coverage/linked_list_test/linked_list_test)
+            add_test(NAME run_array_list_test COMMAND array_list_test)
+            add_test(NAME run_hash_map_test COMMAND hash_map_test)
+            add_test(NAME run_celix_threads_test COMMAND celix_threads_test)
+            add_test(NAME run_thread_pool_test COMMAND thread_pool_test)
+            add_test(NAME run_linked_list_test COMMAND linked_list_test)
+        
+            SETUP_TARGET_FOR_COVERAGE(array_list_test array_list_test 
${CMAKE_BINARY_DIR}/coverage/array_list_test/array_list_test)
+            SETUP_TARGET_FOR_COVERAGE(hash_map hash_map_test 
${CMAKE_BINARY_DIR}/coverage/hash_map_test/hash_map_test)
+            SETUP_TARGET_FOR_COVERAGE(celix_threads_test celix_threads_test 
${CMAKE_BINARY_DIR}/coverage/celix_threads_test/celix_threads_test)
+            SETUP_TARGET_FOR_COVERAGE(thread_pool_test thread_pool_test 
${CMAKE_BINARY_DIR}/coverage/thread_pool_test/thread_pool_test)
+            SETUP_TARGET_FOR_COVERAGE(linked_list_test linked_list_test 
${CMAKE_BINARY_DIR}/coverage/linked_list_test/linked_list_test)
 
    endif(ENABLE_TESTING AND UTILS-TESTS)
 endif (UTILS)

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/private/src/thpool.c
----------------------------------------------------------------------
diff --git a/utils/private/src/thpool.c b/utils/private/src/thpool.c
new file mode 100644
index 0000000..f81350e
--- /dev/null
+++ b/utils/private/src/thpool.c
@@ -0,0 +1,562 @@
+/* ********************************
+ * Author:       Johan Hanssen Seferidis
+ * License:         MIT
+ * Description:  Library providing a threading pool where you can add
+ *               work. For usage, check the thpool.h file or README.md
+ *
+ *//** @file thpool.h *//*
+ * 
+ ********************************/
+
+
+#include <unistd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <pthread.h>
+#include <errno.h>
+#include <time.h> 
+#include <sys/prctl.h>
+
+#include "thpool.h"
+
+#ifdef THPOOL_DEBUG
+#define THPOOL_DEBUG 1
+#else
+#define THPOOL_DEBUG 0
+#endif
+
+#define MAX_NANOSEC 999999999
+#define CEIL(X) ((X-(int)(X)) > 0 ? (int)(X+1) : (int)(X))
+
+static volatile int threads_keepalive;
+static volatile int threads_on_hold;
+
+
+
+
+
+/* ========================== STRUCTURES ============================ */
+
+
+/* Binary semaphore */
+typedef struct bsem {
+       pthread_mutex_t mutex;
+       pthread_cond_t   cond;
+       int v;
+} bsem;
+
+
+/* Job */
+typedef struct job{
+       struct job*  prev;                   /* pointer to previous job   */
+       void*  (*function)(void* arg);       /* function pointer          */
+       void*  arg;                          /* function's argument       */
+} job;
+
+
+/* Job queue */
+typedef struct jobqueue{
+       pthread_mutex_t rwmutex;             /* used for queue r/w access */
+       job  *front;                         /* pointer to front of queue */
+       job  *rear;                          /* pointer to rear  of queue */
+       bsem *has_jobs;                      /* flag as binary semaphore  */
+       int   len;                           /* number of jobs in queue   */
+} jobqueue;
+
+
+/* Thread */
+typedef struct thread{
+       int       id;                        /* friendly id               */
+       pthread_t pthread;                   /* pointer to actual thread  */
+       struct thpool_* thpool_p;            /* access to thpool          */
+} thread;
+
+
+/* Threadpool */
+typedef struct thpool_{
+       thread**   threads;                  /* pointer to threads        */
+       volatile int num_threads_alive;      /* threads currently alive   */
+       volatile int num_threads_working;    /* threads currently working */
+       pthread_mutex_t  thcount_lock;       /* used for thread count etc */
+       jobqueue*  jobqueue_p;               /* pointer to the job queue  */    
+} thpool_;
+
+
+
+
+
+/* ========================== PROTOTYPES ============================ */
+
+
+static void  thread_init(thpool_* thpool_p, struct thread** thread_p, int id);
+static void* thread_do(struct thread* thread_p);
+static void  thread_hold();
+static void  thread_destroy(struct thread* thread_p);
+
+static int   jobqueue_init(thpool_* thpool_p);
+static void  jobqueue_clear(thpool_* thpool_p);
+static void  jobqueue_push(thpool_* thpool_p, struct job* newjob_p);
+static struct job* jobqueue_pull(thpool_* thpool_p);
+static void  jobqueue_destroy(thpool_* thpool_p);
+
+static void  bsem_init(struct bsem *bsem_p, int value);
+static void  bsem_reset(struct bsem *bsem_p);
+static void  bsem_post(struct bsem *bsem_p);
+static void  bsem_post_all(struct bsem *bsem_p);
+static void  bsem_wait(struct bsem *bsem_p);
+
+
+
+
+
+/* ========================== THREADPOOL ============================ */
+
+
+/* Initialise thread pool */
+struct thpool_* thpool_init(int num_threads){
+
+       threads_on_hold   = 0;
+       threads_keepalive = 1;
+
+       if ( num_threads < 0){
+               num_threads = 0;
+       }
+
+       /* Make new thread pool */
+       thpool_* thpool_p;
+       thpool_p = (struct thpool_*)malloc(sizeof(struct thpool_));
+       if (thpool_p == NULL){
+               fprintf(stderr, "thpool_init(): Could not allocate memory for 
thread pool\n");
+               return NULL;
+       }
+       thpool_p->num_threads_alive   = 0;
+       thpool_p->num_threads_working = 0;
+
+       /* Initialise the job queue */
+       if (jobqueue_init(thpool_p) == -1){
+               fprintf(stderr, "thpool_init(): Could not allocate memory for 
job queue\n");
+               free(thpool_p);
+               return NULL;
+       }
+
+       /* Make threads in pool */
+       thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct 
thread));
+       if (thpool_p->threads == NULL){
+               fprintf(stderr, "thpool_init(): Could not allocate memory for 
threads\n");
+               jobqueue_destroy(thpool_p);
+               free(thpool_p->jobqueue_p);
+               free(thpool_p);
+               return NULL;
+       }
+
+       pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
+       
+       /* Thread init */
+       int n;
+       for (n=0; n<num_threads; n++){
+               thread_init(thpool_p, &thpool_p->threads[n], n);
+               if (THPOOL_DEBUG)
+                       printf("THPOOL_DEBUG: Created thread %d in pool \n", n);
+       }
+       
+       /* Wait for threads to initialize */
+       while (thpool_p->num_threads_alive != num_threads) {}
+
+       return thpool_p;
+}
+
+
+/* Add work to the thread pool */
+int thpool_add_work(thpool_* thpool_p, void *(*function_p)(void*), void* 
arg_p){
+       job* newjob;
+
+       newjob=(struct job*)malloc(sizeof(struct job));
+       if (newjob==NULL){
+               fprintf(stderr, "thpool_add_work(): Could not allocate memory 
for new job\n");
+               return -1;
+       }
+
+       /* add function and argument */
+       newjob->function=function_p;
+       newjob->arg=arg_p;
+
+       /* add job to queue */
+       pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
+       jobqueue_push(thpool_p, newjob);
+       pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
+
+       return 0;
+}
+
+
+/* Wait until all jobs have finished */
+void thpool_wait(thpool_* thpool_p){
+
+       /* Continuous polling */
+       double timeout = 1.0;
+       time_t start, end;
+       double tpassed = 0.0;
+       time (&start);
+       while (tpassed < timeout && 
+                       (thpool_p->jobqueue_p->len || 
thpool_p->num_threads_working))
+       {
+               time (&end);
+               tpassed = difftime(end,start);
+       }
+
+       /* Exponential polling */
+       long init_nano =  1; /* MUST be above 0 */
+       long new_nano;
+       double multiplier =  1.01;
+       int  max_secs   = 20;
+       
+       struct timespec polling_interval;
+       polling_interval.tv_sec  = 0;
+       polling_interval.tv_nsec = init_nano;
+       
+       while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working)
+       {
+               nanosleep(&polling_interval, NULL);
+               if ( polling_interval.tv_sec < max_secs ){
+                       new_nano = CEIL(polling_interval.tv_nsec * multiplier);
+                       polling_interval.tv_nsec = new_nano % MAX_NANOSEC;
+                       if ( new_nano > MAX_NANOSEC ) {
+                               polling_interval.tv_sec ++;
+                       }
+               }
+               else break;
+       }
+       
+       /* Fall back to max polling */
+       while (thpool_p->jobqueue_p->len || thpool_p->num_threads_working){
+               sleep(max_secs);
+       }
+}
+
+
+/* Destroy the threadpool */
+void thpool_destroy(thpool_* thpool_p){
+       
+       volatile int threads_total = thpool_p->num_threads_alive;
+
+       /* End each thread 's infinite loop */
+       threads_keepalive = 0;
+       
+       /* Give one second to kill idle threads */
+       double TIMEOUT = 1.0;
+       time_t start, end;
+       double tpassed = 0.0;
+       time (&start);
+       while (tpassed < TIMEOUT && thpool_p->num_threads_alive){
+               bsem_post_all(thpool_p->jobqueue_p->has_jobs);
+               time (&end);
+               tpassed = difftime(end,start);
+       }
+       
+       /* Poll remaining threads */
+       while (thpool_p->num_threads_alive){
+               bsem_post_all(thpool_p->jobqueue_p->has_jobs);
+               sleep(1);
+       }
+
+       /* Job queue cleanup */
+       jobqueue_destroy(thpool_p);
+       free(thpool_p->jobqueue_p);
+       
+       /* Deallocs */
+       int n;
+       for (n=0; n < threads_total; n++){
+               thread_destroy(thpool_p->threads[n]);
+       }
+       free(thpool_p->threads);
+       free(thpool_p);
+}
+
+
+/* Pause all threads in threadpool */
+void thpool_pause(thpool_* thpool_p) {
+       int n;
+       for (n=0; n < thpool_p->num_threads_alive; n++){
+               pthread_kill(thpool_p->threads[n]->pthread, SIGUSR1);
+       }
+}
+
+
+/* Resume all threads in threadpool */
+void thpool_resume(thpool_* thpool_p) {
+       threads_on_hold = 0;
+}
+
+
+
+
+
+/* ============================ THREAD ============================== */
+
+
+/* Initialize a thread in the thread pool
+ * 
+ * @param thread        address to the pointer of the thread to be created
+ * @param id            id to be given to the thread
+ * 
+ */
+static void thread_init (thpool_* thpool_p, struct thread** thread_p, int id){
+       
+       *thread_p = (struct thread*)malloc(sizeof(struct thread));
+       if (thread_p == NULL){
+               fprintf(stderr, "thpool_init(): Could not allocate memory for 
thread\n");
+               exit(1);
+       }
+
+       (*thread_p)->thpool_p = thpool_p;
+       (*thread_p)->id       = id;
+
+       pthread_create(&(*thread_p)->pthread, NULL, (void *)thread_do, 
(*thread_p));
+       pthread_detach((*thread_p)->pthread);
+       
+}
+
+
+/* Sets the calling thread on hold */
+static void thread_hold () {
+       threads_on_hold = 1;
+       while (threads_on_hold){
+               sleep(1);
+       }
+}
+
+
+/* What each thread is doing
+* 
+* In principle this is an endless loop. The only time this loop gets 
interuppted is once
+* thpool_destroy() is invoked or the program exits.
+* 
+* @param  thread        thread that will run this function
+* @return nothing
+*/
+static void* thread_do(struct thread* thread_p){
+       /* Set thread name for profiling and debuging */
+       char thread_name[128] = {0};
+       sprintf(thread_name, "thread-pool-%d", thread_p->id);
+       prctl(PR_SET_NAME, thread_name);
+
+       /* Assure all threads have been created before starting serving */
+       thpool_* thpool_p = thread_p->thpool_p;
+       
+       /* Register signal handler */
+       struct sigaction act;
+       act.sa_handler = thread_hold;
+       if (sigaction(SIGUSR1, &act, NULL) == -1) {
+               fprintf(stderr, "thread_do(): cannot handle SIGUSR1");
+       }
+       
+       /* Mark thread as alive (initialized) */
+       pthread_mutex_lock(&thpool_p->thcount_lock);
+       thpool_p->num_threads_alive += 1;
+       pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+       while(threads_keepalive){
+
+               bsem_wait(thpool_p->jobqueue_p->has_jobs);
+
+               if (threads_keepalive){
+                       
+                       pthread_mutex_lock(&thpool_p->thcount_lock);
+                       thpool_p->num_threads_working++;
+                       pthread_mutex_unlock(&thpool_p->thcount_lock);
+                       
+                       /* Read job from queue and execute it */
+                       void*(*func_buff)(void* arg);
+                       void*  arg_buff;
+                       job* job_p;
+                       pthread_mutex_lock(&thpool_p->jobqueue_p->rwmutex);
+                       job_p = jobqueue_pull(thpool_p);
+                       pthread_mutex_unlock(&thpool_p->jobqueue_p->rwmutex);
+                       if (job_p) {
+                               func_buff = job_p->function;
+                               arg_buff  = job_p->arg;
+                               func_buff(arg_buff);
+                               free(job_p);
+                       }
+                       
+                       pthread_mutex_lock(&thpool_p->thcount_lock);
+                       thpool_p->num_threads_working--;
+                       pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+               }
+       }
+       pthread_mutex_lock(&thpool_p->thcount_lock);
+       thpool_p->num_threads_alive --;
+       pthread_mutex_unlock(&thpool_p->thcount_lock);
+
+       return NULL;
+}
+
+
+/* Frees a thread  */
+static void thread_destroy (thread* thread_p){
+       free(thread_p);
+}
+
+
+
+
+
+/* ============================ JOB QUEUE =========================== */
+
+
+/* Initialize queue */
+static int jobqueue_init(thpool_* thpool_p){
+       
+       thpool_p->jobqueue_p = (struct jobqueue*)malloc(sizeof(struct 
jobqueue));
+       if (thpool_p->jobqueue_p == NULL){
+               return -1;
+       }
+       thpool_p->jobqueue_p->len = 0;
+       thpool_p->jobqueue_p->front = NULL;
+       thpool_p->jobqueue_p->rear  = NULL;
+
+       thpool_p->jobqueue_p->has_jobs = (struct bsem*)malloc(sizeof(struct 
bsem));
+       if (thpool_p->jobqueue_p->has_jobs == NULL){
+               return -1;
+       }
+
+       pthread_mutex_init(&(thpool_p->jobqueue_p->rwmutex), NULL);
+       bsem_init(thpool_p->jobqueue_p->has_jobs, 0);
+
+       return 0;
+}
+
+
+/* Clear the queue */
+static void jobqueue_clear(thpool_* thpool_p){
+
+       while(thpool_p->jobqueue_p->len){
+               free(jobqueue_pull(thpool_p));
+       }
+
+       thpool_p->jobqueue_p->front = NULL;
+       thpool_p->jobqueue_p->rear  = NULL;
+       bsem_reset(thpool_p->jobqueue_p->has_jobs);
+       thpool_p->jobqueue_p->len = 0;
+
+}
+
+
+/* Add (allocated) job to queue
+ *
+ * Notice: Caller MUST hold a mutex
+ */
+static void jobqueue_push(thpool_* thpool_p, struct job* newjob){
+
+       newjob->prev = NULL;
+
+       switch(thpool_p->jobqueue_p->len){
+
+               case 0:  /* if no jobs in queue */
+                                       thpool_p->jobqueue_p->front = newjob;
+                                       thpool_p->jobqueue_p->rear  = newjob;
+                                       break;
+
+               default: /* if jobs in queue */
+                                       thpool_p->jobqueue_p->rear->prev = 
newjob;
+                                       thpool_p->jobqueue_p->rear = newjob;
+                                       
+       }
+       thpool_p->jobqueue_p->len++;
+       
+       bsem_post(thpool_p->jobqueue_p->has_jobs);
+}
+
+
+/* Get first job from queue(removes it from queue)
+ * 
+ * Notice: Caller MUST hold a mutex
+ */
+static struct job* jobqueue_pull(thpool_* thpool_p){
+
+       job* job_p;
+       job_p = thpool_p->jobqueue_p->front;
+
+       switch(thpool_p->jobqueue_p->len){
+               
+               case 0:  /* if no jobs in queue */
+                                       break;
+               
+               case 1:  /* if one job in queue */
+                                       thpool_p->jobqueue_p->front = NULL;
+                                       thpool_p->jobqueue_p->rear  = NULL;
+                                       thpool_p->jobqueue_p->len = 0;
+                                       break;
+               
+               default: /* if >1 jobs in queue */
+                                       thpool_p->jobqueue_p->front = 
job_p->prev;
+                                       thpool_p->jobqueue_p->len--;
+                                       /* more than one job in queue -> post 
it */
+                                       
bsem_post(thpool_p->jobqueue_p->has_jobs);
+                                       
+       }
+       
+       return job_p;
+}
+
+
+/* Free all queue resources back to the system */
+static void jobqueue_destroy(thpool_* thpool_p){
+       jobqueue_clear(thpool_p);
+       free(thpool_p->jobqueue_p->has_jobs);
+}
+
+
+
+
+
+/* ======================== SYNCHRONISATION ========================= */
+
+
+/* Init semaphore to 1 or 0 */
+static void bsem_init(bsem *bsem_p, int value) {
+       if (value < 0 || value > 1) {
+               fprintf(stderr, "bsem_init(): Binary semaphore can take only 
values 1 or 0");
+               exit(1);
+       }
+       pthread_mutex_init(&(bsem_p->mutex), NULL);
+       pthread_cond_init(&(bsem_p->cond), NULL);
+       bsem_p->v = value;
+}
+
+
+/* Reset semaphore to 0 */
+static void bsem_reset(bsem *bsem_p) {
+       bsem_init(bsem_p, 0);
+}
+
+
+/* Post to at least one thread */
+static void bsem_post(bsem *bsem_p) {
+       pthread_mutex_lock(&bsem_p->mutex);
+       bsem_p->v = 1;
+       pthread_cond_signal(&bsem_p->cond);
+       pthread_mutex_unlock(&bsem_p->mutex);
+}
+
+
+/* Post to all threads */
+static void bsem_post_all(bsem *bsem_p) {
+       pthread_mutex_lock(&bsem_p->mutex);
+       bsem_p->v = 1;
+       pthread_cond_broadcast(&bsem_p->cond);
+       pthread_mutex_unlock(&bsem_p->mutex);
+}
+
+
+/* Wait on semaphore until semaphore has value 0 */
+static void bsem_wait(bsem* bsem_p) {
+       pthread_mutex_lock(&bsem_p->mutex);
+       while (bsem_p->v != 1) {
+               pthread_cond_wait(&bsem_p->cond, &bsem_p->mutex);
+       }
+       bsem_p->v = 0;
+       pthread_mutex_unlock(&bsem_p->mutex);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/private/test/thread_pool_test.cpp
----------------------------------------------------------------------
diff --git a/utils/private/test/thread_pool_test.cpp 
b/utils/private/test/thread_pool_test.cpp
new file mode 100644
index 0000000..5dae4c8
--- /dev/null
+++ b/utils/private/test/thread_pool_test.cpp
@@ -0,0 +1,118 @@
+/**
+ *Licensed to the Apache Software Foundation (ASF) under one
+ *or more contributor license agreements.  See the NOTICE file
+ *distributed with this work for additional information
+ *regarding copyright ownership.  The ASF licenses this file
+ *to you under the Apache License, Version 2.0 (the
+ *"License"); you may not use this file except in compliance
+ *with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *Unless required by applicable law or agreed to in writing,
+ *software distributed under the License is distributed on an
+ *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ *specific language governing permissions and limitations
+ *under the License.
+ */
+/*
+ * array_list_test.cpp
+ *
+ *     \date       Sep 15, 2015
+ *  \author            Menno van der Graaf & Alexander
+ *  \copyright Apache License, Version 2.0
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+#include "CppUTest/TestHarness.h"
+#include "CppUTest/TestHarness_c.h"
+#include "CppUTest/CommandLineTestRunner.h"
+
+extern "C" {
+#include "celix_threads.h"
+#include "thpool.h"
+}
+
+celix_thread_mutex_t mutex;
+int sum=0;
+
+
+void * increment(void *) {
+       celixThreadMutex_lock(&mutex);
+       sum ++;
+       celixThreadMutex_unlock(&mutex);
+       return NULL;
+}
+
+int main(int argc, char** argv) {
+       return RUN_ALL_TESTS(argc, argv);
+}
+
+
+//----------------------TEST THREAD FUNCTION DECLARATIONS----------------------
+
+//----------------------TESTGROUP DEFINES----------------------
+
+TEST_GROUP(thread_pool) {
+       threadpool      myPool;
+
+       void setup(void) {
+       }
+
+       void teardown(void) {
+       }
+};
+
+
+//----------------------THREAD_POOL TESTS----------------------
+
+TEST(thread_pool, create) {
+
+       myPool = thpool_init(5);        // pool of 5 threads
+       CHECK((myPool != NULL));
+       thpool_destroy(myPool);
+}
+
+TEST(thread_pool, do_work) {
+
+       myPool = thpool_init(5);        // pool of 5 threads
+       celixThreadMutex_create(&mutex, NULL);
+       CHECK((myPool != NULL));
+       int n;
+       sum = 0;
+       int num_jobs = 1000;
+       for (n = 0; n < num_jobs; n++){
+               thpool_add_work(myPool, increment, NULL);
+       }
+       thpool_wait(myPool);
+       thpool_destroy(myPool);
+       CHECK_EQUAL(1000, sum);
+       celixThreadMutex_destroy(&mutex);
+}
+
+TEST(thread_pool, do_work_with_pause) {
+
+       myPool = thpool_init(5);        // pool of 5 threads
+       celixThreadMutex_create(&mutex, NULL);
+       CHECK((myPool != NULL));
+       int n;
+       sum = 0;
+       int num_jobs = 500000;
+       for (n = 0; n < num_jobs; n++){
+               thpool_add_work(myPool, increment, NULL);
+       }
+       sleep(1);
+       thpool_pause(myPool);
+       for (n = 0; n < num_jobs; n++){
+               thpool_add_work(myPool, increment, NULL);
+       }
+       thpool_resume(myPool);
+       thpool_wait(myPool);
+       thpool_destroy(myPool);
+       CHECK_EQUAL(1000000, sum);
+       celixThreadMutex_destroy(&mutex);
+}

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/docs/Design.md
----------------------------------------------------------------------
diff --git a/utils/public/docs/Design.md b/utils/public/docs/Design.md
new file mode 100644
index 0000000..00fe1b4
--- /dev/null
+++ b/utils/public/docs/Design.md
@@ -0,0 +1,52 @@
+## High level
+       
+       Description: Library providing a threading pool where you can add work 
on the fly. The number
+                    of threads in the pool is adjustable when creating the 
pool. In most cases
+                    this should equal the number of threads supported by your 
cpu.
+                
+                    For an example on how to use the threadpool, check the 
main.c file or just read
+                    the documentation found in the README.md file.
+       
+                    In this header file a detailed overview of the functions 
and the threadpool's logical
+                    scheme is presented in case you wish to tweak or alter 
something. 
+       
+       
+       
+                     _______________________________________________________   
     
+                   /                                                       \
+                   |   JOB QUEUE        | job1 | job2 | job3 | job4 | ..   |
+                   |                                                       |
+                   |   threadpool      | thread1 | thread2 | ..            |
+                   \_______________________________________________________/
+       
+       
+          Description:       Jobs are added to the job queue. Once a thread in 
the pool
+                             is idle, it is assigned with the first job from 
the queue(and
+                             erased from the queue). It's each thread's job to 
read from 
+                             the queue serially(using lock) and executing each 
job
+                             until the queue is empty.
+       
+       
+          Scheme:
+       
+          thpool______                jobqueue____                      ______ 
+          |           |               |           |       
.----------->|_job0_| Newly added job
+          |           |               |  rear  ----------'             |_job1_|
+          | jobqueue----------------->|           |                    |_job2_|
+          |           |               |  front ----------.             
|__..__| 
+          |___________|               |___________|       
'----------->|_jobn_| Job for thread to take
+       
+       
+          job0________ 
+          |           |
+          | function---->
+          |           |
+          |   arg------->
+          |           |         job1________ 
+          |  next-------------->|           |
+          |___________|         |           |..
+
+
+## Synchronisation
+
+*Mutexes* and *binary semaphores* are the main tools to achieve 
synchronisation between threads.

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/docs/FAQ.md
----------------------------------------------------------------------
diff --git a/utils/public/docs/FAQ.md b/utils/public/docs/FAQ.md
new file mode 100644
index 0000000..584a699
--- /dev/null
+++ b/utils/public/docs/FAQ.md
@@ -0,0 +1,33 @@
+
+###Why isn't pthread_exit() used to exit a thread?
+`thread_do` used to use pthread_exit(). However that resulted in
+hard times of testing for memory leaks. The reason is that on pthread_exit()
+not all memory is freed bt pthread (probably for future threads or false
+belief that the application is terminating). For these reasons a simple return
+is used.
+
+Interestingly using `pthread_exit()` results in much more memory being 
allocated.
+
+
+###Why do you use sleep() after calling thpool_destroy()?
+This is needed only in the tests. The reason is that if you call thpool_destroy
+and then exit immedietely, maybe the program will exit before all the threads
+had the time to deallocate. In that way it is impossible to check for memory
+leaks.
+
+In production you don't have to worry about this since if you call exit,
+immedietely after you destroyied the pool, the threads will be freed
+anyway by the OS. If you eitherway destroy the pool in the middle of your
+program it doesn't matter again since the program will not exit immediately
+and thus threads will have more than enough time to terminate.
+
+
+
+###Why does wait() use all my CPU?
+Normally `wait()` will spike CPU usage to full when called. This is normal as 
long as it doesn't last for more than 1 second. The reason this happens is that 
`wait()` goes through various phases of polling (what is called smart polling).
+
+ * Initially there is no interval between polling and hence the 100% use of 
your CPU.
+ * After that the polling interval grows exponentially.
+ * Finally after x seconds, if there is still work, polling falls back to a 
very big interval.
+ 
+The reason `wait()` works in this way, is that the function is mostly used 
when someone wants to wait for some calculation to finish. So if the 
calculation is assumed to take a long time then we don't want to poll too 
often. Still we want to poll fast in case the calculation is a simple one. To 
solve these two problems, this seemingly awkward behaviour is present.

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/docs/README.md
----------------------------------------------------------------------
diff --git a/utils/public/docs/README.md b/utils/public/docs/README.md
new file mode 100644
index 0000000..0a07ebc
--- /dev/null
+++ b/utils/public/docs/README.md
@@ -0,0 +1,62 @@
+![Build 
status](http://178.62.170.124:3000/pithikos/c-thread-pool/badge/?branch=master)
+
+# C Thread Pool
+
+This is a minimal but fully functional threadpool implementation.
+
+  * ANCI C and POSIX compliant
+  * Number of threads can be chosen on initialization
+  * Minimal but powerful interface
+  * Full documentation
+
+The threadpool is under MIT license. Notice that this project took a 
considerable amount of work and sacrifice of my free time and the reason I give 
it for free (even for commercial use) is so when you become rich and wealthy 
you don't forget about us open-source creatures of the night. Cheers!
+
+
+## v2 Changes
+
+This is an updated and heavily refactored version of my original threadpool. 
The main things taken into consideration in this new version are:
+
+  * Synchronisation control from the user (pause/resume/wait)
+  * Thorough testing for memory leaks and race conditions
+  * Cleaner and more opaque API
+  * Smart polling - polling interval changes on-the-fly
+
+
+## Run an example
+
+The library is not precompiled so you have to compile it with your project. 
The thread pool
+uses POSIX threads so if you compile with gcc on Linux you have to use the 
flag `-pthread` like this:
+
+    gcc example.c thpool.c -D THPOOL_DEBUG -pthread -o example
+
+
+Then run the executable like this:
+
+    ./example
+
+
+## Basic usage
+
+1. Include the header in your source file: `#include "thpool.h"`
+2. Create a thread pool with number of threads you want: `threadpool thpool = 
thpool_init(4);`
+3. Add work to the pool: `thpool_add_work(thpool, (void*)function_p, 
(void*)arg_p);`
+
+The workers(threads) will start their work automatically as fast as there is 
new work
+in the pool. If you want to wait for all added work to be finished before 
continuing
+you can use `thpool_wait(thpool);`. If you want to destroy the pool you can use
+`thpool_destroy(thpool);`.
+
+
+
+## API
+
+For a deeper look into the documentation check in the 
[thpool.h](https://github.com/Pithikos/C-Thread-Pool/blob/master/thpool.h) 
file. Below is a fast practical overview.
+
+| Function example                | Description                                
                         |
+|---------------------------------|---------------------------------------------------------------------|
+| ***thpool_init(4)***            | Will return a new threadpool with `4` 
threads.                        |
+| ***thpool_add_work(thpool, (void&#42;)function_p, (void&#42;)arg_p)*** | 
Will add new work to the pool. Work is simply a function. You can pass a single 
argument to the function if you wish. If not, `NULL` should be passed. |
+| ***thpool_wait(thpool)***       | Will wait for all jobs (both in queue and 
currently running) to finish. |
+| ***thpool_destroy(thpool)***    | This will destroy the threadpool. If jobs 
are currently being executed, then it will wait for them to finish. |
+| ***thpool_pause(thpool)***      | All threads in the threadpool will pause 
no matter if they are idle or executing work. |
+| ***thpool_resume(thpool)***      | If the threadpool is paused, then all 
threads will resume from where they were.   |

http://git-wip-us.apache.org/repos/asf/celix/blob/1e73e4d1/utils/public/include/thpool.h
----------------------------------------------------------------------
diff --git a/utils/public/include/thpool.h b/utils/public/include/thpool.h
new file mode 100644
index 0000000..ab3063b
--- /dev/null
+++ b/utils/public/include/thpool.h
@@ -0,0 +1,164 @@
+/********************************** 
+ * @author      Johan Hanssen Seferidis
+ * License:     MIT
+ * 
+ **********************************/
+
+#ifndef _THPOOL_
+#define _THPOOL_
+
+
+
+
+
+/* =================================== API 
======================================= */
+
+
+typedef struct thpool_* threadpool;
+
+
+/**
+ * @brief  Initialize threadpool
+ * 
+ * Initializes a threadpool. This function will not return untill all
+ * threads have initialized successfully.
+ * 
+ * @example
+ * 
+ *    ..
+ *    threadpool thpool;                     //First we declare a threadpool
+ *    thpool = thpool_init(4);               //then we initialize it to 4 
threads
+ *    ..
+ * 
+ * @param  num_threads   number of threads to be created in the threadpool
+ * @return threadpool    created threadpool on success,
+ *                       NULL on error
+ */
+threadpool thpool_init(int num_threads);
+
+
+/**
+ * @brief Add work to the job queue
+ * 
+ * Takes an action and its argument and adds it to the threadpool's job queue.
+ * If you want to add to work a function with more than one arguments then
+ * a way to implement this is by passing a pointer to a structure.
+ * 
+ * NOTICE: You have to cast both the function and argument to not get warnings.
+ * 
+ * @example
+ * 
+ *    void print_num(int num){
+ *       printf("%d\n", num);
+ *    }
+ * 
+ *    int main() {
+ *       ..
+ *       int a = 10;
+ *       thpool_add_work(thpool, (void*)print_num, (void*)a);
+ *       ..
+ *    }
+ * 
+ * @param  threadpool    threadpool to which the work will be added
+ * @param  function_p    pointer to function to add as work
+ * @param  arg_p         pointer to an argument
+ * @return nothing
+ */
+int thpool_add_work(threadpool, void *(*function_p)(void*), void* arg_p);
+
+
+/**
+ * @brief Wait for all queued jobs to finish
+ * 
+ * Will wait for all jobs - both queued and currently running to finish.
+ * Once the queue is empty and all work has completed, the calling thread
+ * (probably the main program) will continue.
+ * 
+ * Smart polling is used in wait. The polling is initially 0 - meaning that
+ * there is virtually no polling at all. If after 1 seconds the threads
+ * haven't finished, the polling interval starts growing exponentially 
+ * untill it reaches max_secs seconds. Then it jumps down to a maximum polling
+ * interval assuming that heavy processing is being used in the threadpool.
+ *
+ * @example
+ * 
+ *    ..
+ *    threadpool thpool = thpool_init(4);
+ *    ..
+ *    // Add a bunch of work
+ *    ..
+ *    thpool_wait(thpool);
+ *    puts("All added work has finished");
+ *    ..
+ * 
+ * @param threadpool     the threadpool to wait for
+ * @return nothing
+ */
+void thpool_wait(threadpool);
+
+
+/**
+ * @brief Pauses all threads immediately
+ * 
+ * The threads will be paused no matter if they are idle or working.
+ * The threads return to their previous states once thpool_resume
+ * is called.
+ * 
+ * While the thread is being paused, new work can be added.
+ * 
+ * @example
+ * 
+ *    threadpool thpool = thpool_init(4);
+ *    thpool_pause(thpool);
+ *    ..
+ *    // Add a bunch of work
+ *    ..
+ *    thpool_resume(thpool); // Let the threads start their magic
+ * 
+ * @param threadpool    the threadpool where the threads should be paused
+ * @return nothing
+ */
+void thpool_pause(threadpool);
+
+
+/**
+ * @brief Unpauses all threads if they are paused
+ * 
+ * @example
+ *    ..
+ *    thpool_pause(thpool);
+ *    sleep(10);              // Delay execution 10 seconds
+ *    thpool_resume(thpool);
+ *    ..
+ * 
+ * @param threadpool     the threadpool where the threads should be unpaused
+ * @return nothing
+ */
+void thpool_resume(threadpool);
+
+
+/**
+ * @brief Destroy the threadpool
+ * 
+ * This will wait for the currently active threads to finish and then 'kill'
+ * the whole threadpool to free up memory.
+ * 
+ * @example
+ * int main() {
+ *    threadpool thpool1 = thpool_init(2);
+ *    threadpool thpool2 = thpool_init(2);
+ *    ..
+ *    thpool_destroy(thpool1);
+ *    ..
+ *    return 0;
+ * }
+ * 
+ * @param threadpool     the threadpool to destroy
+ * @return nothing
+ */
+void thpool_destroy(threadpool);
+
+
+
+
+#endif

Reply via email to