Henry Jen wrote:
Hi,

Attached please find the patch for thread pool implementation, looking forward to see it get committed.


I just realized that I sent the wrong patch, which did not drop the copyright notice. Attached is the correct patch. :-)

Just want to make sure the consensus is the code is ready for commit and is now simply waiting some committer's love.

Cheers,
Henry



Index: aprutil.dsp
===================================================================
--- aprutil.dsp	(revision 453014)
+++ aprutil.dsp	(working copy)
@@ -240,6 +240,10 @@
 # PROP Default_Filter ""
 # Begin Source File
 
+SOURCE=.\misc\apr_thread_pool.c
+# End Source File
+# Begin Source File
+
 SOURCE=.\misc\apr_date.c
 # End Source File
 # Begin Source File
@@ -512,6 +516,10 @@
 # End Source File
 # Begin Source File
 
+SOURCE=.\include\apr_thread_pool.h
+# End Source File
+# Begin Source File
+
 SOURCE=.\include\apr_date.h
 # End Source File
 # Begin Source File
Index: include/apr_thread_pool.h
===================================================================
--- include/apr_thread_pool.h	(revision 0)
+++ include/apr_thread_pool.h	(revision 0)
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership.  The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License.  You may obtain a copy of
+ * the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#ifndef APR_THREAD_POOL_H
+#define APR_THREAD_POOL_H
+
+#include "apr.h"
+#include "apr_thread_proc.h"
+
+/**
+ * @file apr_thread_pool.h
+ * @brief APR Thread Pool Library
+
+ * @remarks This library implements a thread pool using apr_thread_t. A thread
+ * pool is a set of threads that can be created in advance or on demand until a
+ * maximum number. When a task is scheduled, the thread pool will find an idle
+ * thread to handle the task. In case all existing threads are busy and the
+ * number of tasks in the queue is higher than the adjustable threshold, the
+ * pool will try to create a new thread to serve the task if the maximum number
+ * has not been reached. Otherwise, the task will be put into a queue based on
+ * priority, which can be valued from 0 to 255, with higher value been served
+ * first. In case there are tasks with the same priority, the new task is put at
+ * the top or the bottom depeneds on which function is used to put the task.
+ *
+ * @remarks There may be the case that a thread pool can use up the maximum
+ * number of threads at peak load, but having those threads idle afterwards. A
+ * maximum number of idle threads can be set so that extra idling threads will
+ * be terminated to save system resrouces. 
+ */
+#if APR_HAS_THREADS
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0
+};
+#endif
+#endif /* __cplusplus */
+
+/** Opaque Thread Pool structure. */
+typedef struct apr_thread_pool apr_thread_pool_t;
+
+#define APR_THREAD_TASK_PRIORITY_LOWEST 0
+#define APR_THREAD_TASK_PRIORITY_LOW 63
+#define APR_THREAD_TASK_PRIORITY_NORMAL 127
+#define APR_THREAD_TASK_PRIORITY_HIGH 191
+#define APR_THREAD_TASK_PRIORITY_HIGHEST 255
+
+/**
+ * Create a thread pool
+ * @param me A pointer points to the pointer receives the created
+ * apr_thread_pool object. The returned value will be NULL if failed to create
+ * the thread pool.
+ * @param init_threads The number of threads to be created initially, the number
+ * will also be used as the initial value for maximum number of idle threads. 
+ * @param max_threads The maximum number of threads that can be created
+ * @param pool The pool to use
+ * @return APR_SUCCESS if the thread pool was created successfully. Otherwise,
+ * the error code.
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
+                                                 apr_size_t init_threads,
+                                                 apr_size_t max_threads,
+                                                 apr_pool_t * pool);
+
+/**
+ * Destroy the thread pool and stop all the threads
+ * @return APR_SUCCESS if all threads are stopped.
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me);
+
+/**
+ * Schedule a task to the bottom of the tasks of same priority.
+ * @param me The thread pool
+ * @param func The task function
+ * @param param The parameter for the task function
+ * @param priority The priority of the task.
+ * @param owner Owner of this task.
+ * @return APR_SUCCESS if the task had been scheduled successfully
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me,
+                                               apr_thread_start_t func,
+                                               void *param,
+                                               apr_byte_t priority,
+                                               void *owner);
+/**
+ * Schedule a task to be run after a delay
+ * @param me The thread pool
+ * @param func The task function
+ * @param param The parameter for the task function
+ * @param time Time in microseconds
+ * @param owner Owner of this task.
+ * @return APR_SUCCESS if the task had been scheduled successfully
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t * me,
+                                                   apr_thread_start_t func,
+                                                   void *param,
+                                                   apr_interval_time_t time,
+                                                   void *owner);
+
+/**
+ * Schedule a task to the top of the tasks of same priority.
+ * @param me The thread pool
+ * @param func The task function
+ * @param param The parameter for the task function
+ * @param priority The priority of the task.
+ * @param owner Owner of this task.
+ * @return APR_SUCCESS if the task had been scheduled successfully
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me,
+                                              apr_thread_start_t func,
+                                              void *param,
+                                              apr_byte_t priority,
+                                              void *owner);
+
+/**
+ * Cancel tasks submitted by the owner. If there is any task from the owner is
+ * currently under process, the function will spin until the task finished.
+ * @param me The thread pool
+ * @param owner Owner of the task
+ * @return APR_SUCCESS if the task has been cancelled successfully
+ * @note The task function should not be calling cancel, otherwise the function
+ * may get stuck forever. The function assert if it detect such a case.
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t * me,
+                                                       void *owner);
+
+/**
+ * Get current number of tasks waiting in the queue
+ * @param me The thread pool
+ * @return Number of tasks in the queue
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me);
+
+/**
+ * Get current number of scheduled tasks waiting in the queue
+ * @param me The thread pool
+ * @return Number of scheduled tasks in the queue
+ */
+APR_DECLARE(apr_size_t)
+    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t * me);
+
+/**
+ * Get current number of threads
+ * @param me The thread pool
+ * @return Number of total threads
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t * me);
+
+/**
+ * Get current number of busy threads
+ * @param me The thread pool
+ * @return Number of busy threads
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t * me);
+
+/**
+ * Get current number of idling thread
+ * @param me The thread pool
+ * @return Number of idling threads
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me);
+
+/**
+ * Access function for the maximum number of idling thread. Number of current
+ * idle threads will be reduced to the new limit.
+ * @param me The thread pool
+ * @param cnt The number
+ * @return The number of threads were stopped.
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me,
+                                                     apr_size_t cnt);
+
+/**
+ * Access function for the maximum number of idling thread
+ * @param me The thread pool
+ * @return The current maximum number
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me);
+
+/**
+ * Access function for the maximum number of thread. 
+ * @param me The thread pool
+ * @param cnt The number
+ * @return The original maximum number of threads
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t * me,
+                                                       apr_size_t cnt);
+
+/**
+ * Access function for the maximum number of threads
+ * @param me The thread pool
+ * @return The current maximum number
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t *
+                                                       me);
+
+/**
+ * Access function for the threshold of tasks in queue to trigger a new thread. 
+ * @param me The thread pool
+ * @param cnt The new threshold
+ * @return The original threshold
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t * me,
+                                                      apr_size_t val);
+
+/**
+ * Access function for the threshold of tasks in queue to trigger a new thread. 
+ * @param me The thread pool
+ * @return The current threshold
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t * me);
+
+#ifdef __cplusplus
+#if 0
+{
+#endif
+}
+#endif
+
+#endif /* APR_HAS_THREADS */
+
+#endif /* APR_THREAD_POOL_H */
+
+/* vim: set ts=4 sw=4 et cin tw=80: */
Index: misc/apr_thread_pool.c
===================================================================
--- misc/apr_thread_pool.c	(revision 0)
+++ misc/apr_thread_pool.c	(revision 0)
@@ -0,0 +1,817 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed
+ * with this work for additional information regarding copyright
+ * ownership.  The ASF licenses this file to you under the Apache
+ * License, Version 2.0 (the "License"); you may not use this file
+ * except in compliance with the License.  You may obtain a copy of
+ * the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+#include <assert.h>
+#include "apr_thread_pool.h"
+#include "apr_ring.h"
+#include "apr_thread_cond.h"
+#include "apr_portable.h"
+
+#if APR_HAS_THREADS
+
+#define TASK_PRIORITY_SEGS 4
+#define TASK_PRIORITY_SEG(x) (((x)->dispatch.priority & 0xFF) / 64)
+
+typedef struct apr_thread_pool_task
+{
+    APR_RING_ENTRY(apr_thread_pool_task) link;
+    apr_thread_start_t func;
+    void *param;
+    void *owner;
+    union
+    {
+        apr_byte_t priority;
+        apr_time_t time;
+    } dispatch;
+} apr_thread_pool_task_t;
+
+APR_RING_HEAD(apr_thread_pool_tasks, apr_thread_pool_task);
+
+struct apr_thread_list_elt
+{
+    APR_RING_ENTRY(apr_thread_list_elt) link;
+    apr_thread_t *thd;
+    volatile void *current_owner;
+    volatile int stop;
+};
+
+APR_RING_HEAD(apr_thread_list, apr_thread_list_elt);
+
+struct apr_thread_pool
+{
+    apr_pool_t *pool;
+    volatile apr_size_t thd_max;
+    volatile apr_size_t idle_max;
+    volatile apr_size_t thd_cnt;
+    volatile apr_size_t idle_cnt;
+    volatile apr_size_t task_cnt;
+    volatile apr_size_t scheduled_task_cnt;
+    volatile apr_size_t threshold;
+    struct apr_thread_pool_tasks *tasks;
+    struct apr_thread_pool_tasks *scheduled_tasks;
+    struct apr_thread_list *busy_thds;
+    struct apr_thread_list *idle_thds;
+    apr_thread_mutex_t *lock;
+    apr_thread_mutex_t *cond_lock;
+    apr_thread_cond_t *cond;
+    volatile int terminated;
+    struct apr_thread_pool_tasks *recycled_tasks;
+    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
+};
+
+static apr_status_t thread_pool_construct(apr_thread_pool_t * me,
+                                          apr_size_t init_threads,
+                                          apr_size_t max_threads)
+{
+    apr_status_t rv;
+    int i;
+
+    me->thd_max = max_threads;
+    me->idle_max = init_threads;
+    me->threshold = init_threads / 2;
+    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
+                                 me->pool);
+    if (APR_SUCCESS != rv) {
+        return rv;
+    }
+    rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_UNNESTED,
+                                 me->pool);
+    if (APR_SUCCESS != rv) {
+        apr_thread_mutex_destroy(me->lock);
+        return rv;
+    }
+    rv = apr_thread_cond_create(&me->cond, me->pool);
+    if (APR_SUCCESS != rv) {
+        apr_thread_mutex_destroy(me->lock);
+        apr_thread_mutex_destroy(me->cond_lock);
+        return rv;
+    }
+    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
+    if (!me->tasks) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->tasks, apr_thread_pool_task, link);
+    me->scheduled_tasks = apr_palloc(me->pool, sizeof(*me->scheduled_tasks));
+    if (!me->scheduled_tasks) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->scheduled_tasks, apr_thread_pool_task, link);
+    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
+    if (!me->recycled_tasks) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->recycled_tasks, apr_thread_pool_task, link);
+    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
+    if (!me->busy_thds) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->busy_thds, apr_thread_list_elt, link);
+    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
+    if (!me->idle_thds) {
+        goto CATCH_ENOMEM;
+    }
+    APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
+    me->thd_cnt = me->idle_cnt = me->task_cnt = me->scheduled_task_cnt = 0;
+    me->terminated = 0;
+    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
+        me->task_idx[i] = NULL;
+    }
+    goto FINAL_EXIT;
+  CATCH_ENOMEM:
+    rv = APR_ENOMEM;
+    apr_thread_mutex_destroy(me->lock);
+    apr_thread_mutex_destroy(me->cond_lock);
+    apr_thread_cond_destroy(me->cond);
+  FINAL_EXIT:
+    return rv;
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *pop_task(apr_thread_pool_t * me)
+{
+    apr_thread_pool_task_t *task = NULL;
+    int seg;
+
+    /* check for scheduled tasks */
+    if (me->scheduled_task_cnt > 0) {
+        task = APR_RING_FIRST(me->scheduled_tasks);
+        assert(task != NULL);
+        assert(task !=
+               APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+                                 link));
+        /* if it's time */
+        if (task->dispatch.time <= apr_time_now()) {
+            --me->scheduled_task_cnt;
+            APR_RING_REMOVE(task, link);
+            return task;
+        }
+    }
+    /* check for normal tasks if we're not returning a scheduled task */
+    if (me->task_cnt == 0) {
+        return NULL;
+    }
+
+    task = APR_RING_FIRST(me->tasks);
+    assert(task != NULL);
+    assert(task != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link));
+    --me->task_cnt;
+    seg = TASK_PRIORITY_SEG(task);
+    if (task == me->task_idx[seg]) {
+        me->task_idx[seg] = APR_RING_NEXT(task, link);
+        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+                                                   apr_thread_pool_task, link)
+            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+            me->task_idx[seg] = NULL;
+        }
+    }
+    APR_RING_REMOVE(task, link);
+    return task;
+}
+
+static apr_interval_time_t waiting_time(apr_thread_pool_t * me)
+{
+    apr_thread_pool_task_t *task = NULL;
+
+    task = APR_RING_FIRST(me->scheduled_tasks);
+    assert(task != NULL);
+    assert(task !=
+           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+                             link));
+    return task->dispatch.time - apr_time_now();
+}
+
+/*
+ * The worker thread function. Take a task from the queue and perform it if
+ * there is any. Otherwise, put itself into the idle thread list and waiting
+ * for signal to wake up.
+ * The thread terminate directly by detach and exit when it is asked to stop
+ * after finishing a task. Otherwise, the thread should be in idle thread list
+ * and should be joined.
+ */
+static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
+{
+    apr_status_t rv = APR_SUCCESS;
+    apr_thread_pool_t *me = param;
+    apr_thread_pool_task_t *task = NULL;
+    apr_interval_time_t wait;
+    struct apr_thread_list_elt *elt;
+
+    elt = apr_pcalloc(me->pool, sizeof(*elt));
+    if (!elt) {
+        apr_thread_exit(t, APR_ENOMEM);
+    }
+    APR_RING_ELEM_INIT(elt, link);
+    elt->thd = t;
+    elt->stop = 0;
+
+    apr_thread_mutex_lock(me->lock);
+    while (!me->terminated && !elt->stop) {
+        /* if not new element, it is awakened from idle */
+        if (APR_RING_NEXT(elt, link) != elt) {
+            --me->idle_cnt;
+            APR_RING_REMOVE(elt, link);
+        }
+        APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
+        task = pop_task(me);
+        while (NULL != task && !me->terminated) {
+            elt->current_owner = task->owner;
+            apr_thread_mutex_unlock(me->lock);
+            task->func(t, task->param);
+            apr_thread_mutex_lock(me->lock);
+            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
+                                 apr_thread_pool_task, link);
+            elt->current_owner = NULL;
+            if (elt->stop) {
+                break;
+            }
+            task = pop_task(me);
+        }
+        assert(NULL == elt->current_owner);
+        APR_RING_REMOVE(elt, link);
+
+        /* busy thread been asked to stop, not joinable */
+        if ((me->idle_cnt >= me->idle_max
+             && !(me->scheduled_task_cnt && 0 >= me->idle_max))
+            || me->terminated || elt->stop) {
+            --me->thd_cnt;
+            apr_thread_mutex_unlock(me->lock);
+            apr_thread_detach(t);
+            apr_thread_exit(t, APR_SUCCESS);
+            return NULL;        /* should not be here, safe net */
+        }
+
+        ++me->idle_cnt;
+        APR_RING_INSERT_TAIL(me->idle_thds, elt, apr_thread_list_elt, link);
+        wait = (me->scheduled_task_cnt) ? waiting_time(me) : -1;
+        apr_thread_mutex_unlock(me->lock);
+        apr_thread_mutex_lock(me->cond_lock);
+        if (wait >= 0) {
+            rv = apr_thread_cond_timedwait(me->cond, me->cond_lock, wait);
+        }
+        else {
+            rv = apr_thread_cond_wait(me->cond, me->cond_lock);
+        }
+        apr_thread_mutex_unlock(me->cond_lock);
+        apr_thread_mutex_lock(me->lock);
+    }
+
+    /* idle thread been asked to stop, will be joined */
+    --me->thd_cnt;
+    apr_thread_mutex_unlock(me->lock);
+    apr_thread_exit(t, APR_SUCCESS);
+    return NULL;                /* should not be here, safe net */
+}
+
+static apr_status_t thread_pool_cleanup(void *me)
+{
+    apr_thread_pool_t *_self = me;
+
+    _self->terminated = 1;
+    apr_thread_pool_idle_max_set(_self, 0);
+    while (_self->thd_cnt) {
+        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
+    }
+    apr_thread_mutex_destroy(_self->lock);
+    apr_thread_mutex_destroy(_self->cond_lock);
+    apr_thread_cond_destroy(_self->cond);
+    return APR_SUCCESS;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
+                                                 apr_size_t init_threads,
+                                                 apr_size_t max_threads,
+                                                 apr_pool_t * pool)
+{
+    apr_thread_t *t;
+    apr_status_t rv = APR_SUCCESS;
+
+    *me = apr_pcalloc(pool, sizeof(**me));
+    if (!*me) {
+        return APR_ENOMEM;
+    }
+
+    (*me)->pool = pool;
+
+    rv = thread_pool_construct(*me, init_threads, max_threads);
+    if (APR_SUCCESS != rv) {
+        *me = NULL;
+        return rv;
+    }
+    apr_pool_cleanup_register(pool, *me, thread_pool_cleanup,
+                              apr_pool_cleanup_null);
+
+    while (init_threads) {
+        rv = apr_thread_create(&t, NULL, thread_pool_func, *me, (*me)->pool);
+        if (APR_SUCCESS != rv) {
+            break;
+        }
+        ++(*me)->thd_cnt;
+        --init_threads;
+    }
+
+    return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
+{
+    return apr_pool_cleanup_run(me->pool, me, thread_pool_cleanup);
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
+                                        apr_thread_start_t func,
+                                        void *param, apr_byte_t priority,
+                                        void *owner, apr_time_t time)
+{
+    apr_thread_pool_task_t *t;
+
+    if (APR_RING_EMPTY(me->recycled_tasks, apr_thread_pool_task, link)) {
+        t = apr_pcalloc(me->pool, sizeof(*t));
+        if (NULL == t) {
+            return NULL;
+        }
+    }
+    else {
+        t = APR_RING_FIRST(me->recycled_tasks);
+        APR_RING_REMOVE(t, link);
+    }
+
+    APR_RING_ELEM_INIT(t, link);
+    t->func = func;
+    t->param = param;
+    t->owner = owner;
+    if (time > 0) {
+        t->dispatch.time = apr_time_now() + time;
+    }
+    else {
+        t->dispatch.priority = priority;
+    }
+    return t;
+}
+
+/*
+ * Test it the task is the only one within the priority segment. 
+ * If it is not, return the first element with same or lower priority. 
+ * Otherwise, add the task into the queue and return NULL.
+ *
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
+                                            apr_thread_pool_task_t * const t)
+{
+    int seg;
+    int next;
+    apr_thread_pool_task_t *t_next;
+
+    seg = TASK_PRIORITY_SEG(t);
+    if (me->task_idx[seg]) {
+        assert(APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
+               me->task_idx[seg]);
+        t_next = me->task_idx[seg];
+        while (t_next->dispatch.priority > t->dispatch.priority) {
+            t_next = APR_RING_NEXT(t_next, link);
+            if (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) ==
+                t_next) {
+                return t_next;
+            }
+        }
+        return t_next;
+    }
+
+    for (next = seg - 1; next >= 0; next--) {
+        if (me->task_idx[next]) {
+            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
+            break;
+        }
+    }
+    if (0 > next) {
+        APR_RING_INSERT_TAIL(me->tasks, t, apr_thread_pool_task, link);
+    }
+    me->task_idx[seg] = t;
+    return NULL;
+}
+
+/*
+*   schedule a task to run in "time" milliseconds. Find the spot in the ring where
+*   the time fits. Adjust the short_time so the thread wakes up when the time is reached.
+*/
+static apr_status_t schedule_task(apr_thread_pool_t * me,
+                                  apr_thread_start_t func, void *param,
+                                  void *owner, apr_interval_time_t time)
+{
+    apr_thread_pool_task_t *t;
+    apr_thread_pool_task_t *t_loc;
+    apr_thread_t *thd;
+    apr_status_t rv = APR_SUCCESS;
+    apr_thread_mutex_lock(me->lock);
+
+    t = task_new(me, func, param, 0, owner, time);
+    if (NULL == t) {
+        apr_thread_mutex_unlock(me->lock);
+        return APR_ENOMEM;
+    }
+    t_loc = APR_RING_FIRST(me->scheduled_tasks);
+    while (NULL != t_loc) {
+        /* if the time is less than the entry insert ahead of it */
+        if (t->dispatch.time < t_loc->dispatch.time) {
+            ++me->scheduled_task_cnt;
+            APR_RING_INSERT_BEFORE(t_loc, t, link);
+            break;
+        }
+        else {
+            t_loc = APR_RING_NEXT(t_loc, link);
+            if (t_loc ==
+                APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+                                  link)) {
+                ++me->scheduled_task_cnt;
+                APR_RING_INSERT_TAIL(me->scheduled_tasks, t,
+                                     apr_thread_pool_task, link);
+                break;
+            }
+        }
+    }
+    /* there should be at least one thread for scheduled tasks */
+    if (0 == me->thd_cnt) {
+        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
+        if (APR_SUCCESS == rv) {
+            ++me->thd_cnt;
+        }
+    }
+    apr_thread_mutex_unlock(me->lock);
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_signal(me->cond);
+    apr_thread_mutex_unlock(me->cond_lock);
+    return rv;
+}
+
+static apr_status_t add_task(apr_thread_pool_t * me, apr_thread_start_t func,
+                             void *param, apr_byte_t priority, int push,
+                             void *owner)
+{
+    apr_thread_pool_task_t *t;
+    apr_thread_pool_task_t *t_loc;
+    apr_thread_t *thd;
+    apr_status_t rv = APR_SUCCESS;
+
+    apr_thread_mutex_lock(me->lock);
+
+    t = task_new(me, func, param, priority, owner, 0);
+    if (NULL == t) {
+        apr_thread_mutex_unlock(me->lock);
+        return APR_ENOMEM;
+    }
+
+    t_loc = add_if_empty(me, t);
+    if (NULL == t_loc) {
+        goto FINAL_EXIT;
+    }
+
+    if (push) {
+        while (APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link) !=
+               t_loc && t_loc->dispatch.priority >= t->dispatch.priority) {
+            t_loc = APR_RING_NEXT(t_loc, link);
+        }
+    }
+    APR_RING_INSERT_BEFORE(t_loc, t, link);
+    if (!push) {
+        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
+            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
+        }
+    }
+
+  FINAL_EXIT:
+    me->task_cnt++;
+    if (0 == me->thd_cnt || (0 == me->idle_cnt && me->thd_cnt < me->thd_max &&
+                             me->task_cnt > me->threshold)) {
+        rv = apr_thread_create(&thd, NULL, thread_pool_func, me, me->pool);
+        if (APR_SUCCESS == rv) {
+            ++me->thd_cnt;
+        }
+    }
+    apr_thread_mutex_unlock(me->lock);
+
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_signal(me->cond);
+    apr_thread_mutex_unlock(me->cond_lock);
+
+    return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me,
+                                               apr_thread_start_t func,
+                                               void *param,
+                                               apr_byte_t priority,
+                                               void *owner)
+{
+    return add_task(me, func, param, priority, 1, owner);
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_schedule(apr_thread_pool_t * me,
+                                                   apr_thread_start_t func,
+                                                   void *param,
+                                                   apr_interval_time_t time,
+                                                   void *owner)
+{
+    return schedule_task(me, func, param, owner, time);
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me,
+                                              apr_thread_start_t func,
+                                              void *param,
+                                              apr_byte_t priority,
+                                              void *owner)
+{
+    return add_task(me, func, param, priority, 0, owner);
+}
+
+static apr_status_t remove_scheduled_tasks(apr_thread_pool_t * me,
+                                           void *owner)
+{
+    apr_thread_pool_task_t *t_loc;
+    apr_thread_pool_task_t *next;
+
+    t_loc = APR_RING_FIRST(me->scheduled_tasks);
+    while (t_loc !=
+           APR_RING_SENTINEL(me->scheduled_tasks, apr_thread_pool_task,
+                             link)) {
+        next = APR_RING_NEXT(t_loc, link);
+        /* if this is the owner remove it */
+        if (t_loc->owner == owner) {
+            --me->scheduled_task_cnt;
+            APR_RING_REMOVE(t_loc, link);
+        }
+        t_loc = next;
+    }
+    return APR_SUCCESS;
+}
+
+static apr_status_t remove_tasks(apr_thread_pool_t * me, void *owner)
+{
+    apr_thread_pool_task_t *t_loc;
+    apr_thread_pool_task_t *next;
+    int seg;
+
+    t_loc = APR_RING_FIRST(me->tasks);
+    while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
+        next = APR_RING_NEXT(t_loc, link);
+        if (t_loc->owner == owner) {
+            --me->task_cnt;
+            seg = TASK_PRIORITY_SEG(t_loc);
+            if (t_loc == me->task_idx[seg]) {
+                me->task_idx[seg] = APR_RING_NEXT(t_loc, link);
+                if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+                                                           apr_thread_pool_task,
+                                                           link)
+                    || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+                    me->task_idx[seg] = NULL;
+                }
+            }
+            APR_RING_REMOVE(t_loc, link);
+        }
+        t_loc = next;
+    }
+    return APR_SUCCESS;
+}
+
+static void wait_on_busy_threads(apr_thread_pool_t * me, void *owner)
+{
+    struct apr_thread_list_elt *elt;
+    apr_thread_mutex_lock(me->lock);
+    elt = APR_RING_FIRST(me->busy_thds);
+    while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
+#ifndef NDEBUG
+        /* make sure the thread is not the one calling tasks_cancel */
+        apr_os_thread_t *os_thread;
+        apr_os_thread_get(&os_thread, elt->thd);
+#ifdef WIN32
+        /* hack for apr win32 bug */
+        assert(!apr_os_thread_equal(apr_os_thread_current(), os_thread));
+#else
+        assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
+#endif
+#endif
+        if (elt->current_owner != owner) {
+            elt = APR_RING_NEXT(elt, link);
+            continue;
+        }
+        while (elt->current_owner == owner) {
+            apr_thread_mutex_unlock(me->lock);
+            apr_sleep(200 * 1000);
+            apr_thread_mutex_lock(me->lock);
+        }
+        elt = APR_RING_FIRST(me->busy_thds);
+    }
+    apr_thread_mutex_unlock(me->lock);
+    return;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_tasks_cancel(apr_thread_pool_t * me,
+                                                       void *owner)
+{
+    apr_status_t rv = APR_SUCCESS;
+
+    apr_thread_mutex_lock(me->lock);
+    if (me->task_cnt > 0) {
+        rv = remove_tasks(me, owner);
+    }
+    if (me->scheduled_task_cnt > 0) {
+        rv = remove_scheduled_tasks(me, owner);
+    }
+    apr_thread_mutex_unlock(me->lock);
+    wait_on_busy_threads(me, owner);
+
+    return rv;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me)
+{
+    return me->task_cnt;
+}
+
+APR_DECLARE(apr_size_t)
+    apr_thread_pool_scheduled_tasks_count(apr_thread_pool_t * me)
+{
+    return me->scheduled_task_cnt;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_threads_count(apr_thread_pool_t * me)
+{
+    return me->thd_cnt;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_busy_count(apr_thread_pool_t * me)
+{
+    return me->thd_cnt - me->idle_cnt;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me)
+{
+    return me->idle_cnt;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me)
+{
+    return me->idle_max;
+}
+
+/*
+ * This function stop extra idle threads to the cnt.
+ * @return the number of threads stopped
+ * NOTE: There could be busy threads become idle during this function
+ */
+static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t * me,
+                                                apr_size_t * cnt, int idle)
+{
+    struct apr_thread_list *thds;
+    apr_size_t n, n_dbg, i;
+    struct apr_thread_list_elt *head, *tail, *elt;
+
+    apr_thread_mutex_lock(me->lock);
+    if (idle) {
+        thds = me->idle_thds;
+        n = me->idle_cnt;
+    }
+    else {
+        thds = me->busy_thds;
+        n = me->thd_cnt - me->idle_cnt;
+    }
+    if (n <= *cnt) {
+        apr_thread_mutex_unlock(me->lock);
+        *cnt = 0;
+        return NULL;
+    }
+    n -= *cnt;
+
+    head = APR_RING_FIRST(thds);
+    for (i = 0; i < *cnt; i++) {
+        head = APR_RING_NEXT(head, link);
+    }
+    tail = APR_RING_LAST(thds);
+    APR_RING_UNSPLICE(head, tail, link);
+    if (idle) {
+        me->idle_cnt = *cnt;
+    }
+    apr_thread_mutex_unlock(me->lock);
+
+    n_dbg = 0;
+    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
+        elt->stop = 1;
+        n_dbg++;
+    }
+    elt->stop = 1;
+    n_dbg++;
+    assert(n == n_dbg);
+    *cnt = n;
+
+    APR_RING_PREV(head, link) = NULL;
+    APR_RING_NEXT(tail, link) = NULL;
+    return head;
+}
+
+static apr_size_t trim_idle_threads(apr_thread_pool_t * me, apr_size_t cnt)
+{
+    apr_size_t n_dbg;
+    struct apr_thread_list_elt *elt;
+    apr_status_t rv;
+
+    elt = trim_threads(me, &cnt, 1);
+
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_broadcast(me->cond);
+    apr_thread_mutex_unlock(me->cond_lock);
+
+    n_dbg = 0;
+    while (elt) {
+        apr_thread_join(&rv, elt->thd);
+        elt = APR_RING_NEXT(elt, link);
+        ++n_dbg;
+    }
+    assert(cnt == n_dbg);
+
+    return cnt;
+}
+
+/* don't join on busy threads for performance reasons, who knows how long will
+ * the task takes to perform
+ */
+static apr_size_t trim_busy_threads(apr_thread_pool_t * me, apr_size_t cnt)
+{
+    trim_threads(me, &cnt, 0);
+    return cnt;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me,
+                                                     apr_size_t cnt)
+{
+    me->idle_max = cnt;
+    cnt = trim_idle_threads(me, cnt);
+    return cnt;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_get(apr_thread_pool_t * me)
+{
+    return me->thd_max;
+}
+
+/*
+ * This function stop extra working threads to the new limit.
+ * NOTE: There could be busy threads become idle during this function
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t * me,
+                                                       apr_size_t cnt)
+{
+    unsigned int n;
+
+    me->thd_max = cnt;
+    if (0 == cnt || me->thd_cnt <= cnt) {
+        return 0;
+    }
+
+    n = me->thd_cnt - cnt;
+    if (n >= me->idle_cnt) {
+        trim_busy_threads(me, n - me->idle_cnt);
+        trim_idle_threads(me, 0);
+    }
+    else {
+        trim_idle_threads(me, me->idle_cnt - n);
+    }
+    return n;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_threshold_get(apr_thread_pool_t * me)
+{
+    return me->threshold;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_threshold_set(apr_thread_pool_t * me,
+                                                      apr_size_t val)
+{
+    apr_size_t ov;
+
+    ov = me->threshold;
+    me->threshold = val;
+    return ov;
+}
+
+#endif /* APR_HAS_THREADS */
+
+/* vim: set ts=4 sw=4 et cin tw=80: */

Reply via email to