On Thu, Nov 26, 2020 at 2:35 PM Yann Ylavic <[email protected]> wrote:
>
> So now I go change apr_threadpool so that it does not detach threads
> and joins all of them on cleanup, see attached
> "threadpool_no_detach.diff".
Sorry, this was an older patch, latest one here.
Index: util-misc/apr_thread_pool.c
===================================================================
--- util-misc/apr_thread_pool.c (revision 1883836)
+++ util-misc/apr_thread_pool.c (working copy)
@@ -71,9 +71,13 @@ struct apr_thread_pool
struct apr_thread_pool_tasks *scheduled_tasks;
struct apr_thread_list *busy_thds;
struct apr_thread_list *idle_thds;
+ struct apr_thread_list *dead_thds;
+ apr_thread_cond_t *more_work;
+ apr_thread_cond_t *work_done;
+ apr_thread_cond_t *all_done;
apr_thread_mutex_t *lock;
- apr_thread_cond_t *cond;
volatile int terminated;
+ int waiting_work_done;
struct apr_thread_pool_tasks *recycled_tasks;
struct apr_thread_list *recycled_thds;
apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
@@ -94,11 +98,24 @@ static apr_status_t thread_pool_construct(apr_thre
if (APR_SUCCESS != rv) {
return rv;
}
- rv = apr_thread_cond_create(&me->cond, me->pool);
+ rv = apr_thread_cond_create(&me->more_work, me->pool);
if (APR_SUCCESS != rv) {
apr_thread_mutex_destroy(me->lock);
return rv;
}
+ rv = apr_thread_cond_create(&me->work_done, me->pool);
+ if (APR_SUCCESS != rv) {
+ apr_thread_cond_destroy(me->more_work);
+ apr_thread_mutex_destroy(me->lock);
+ return rv;
+ }
+ rv = apr_thread_cond_create(&me->all_done, me->pool);
+ if (APR_SUCCESS != rv) {
+ apr_thread_cond_destroy(me->work_done);
+ apr_thread_cond_destroy(me->more_work);
+ apr_thread_mutex_destroy(me->lock);
+ return rv;
+ }
me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
if (!me->tasks) {
goto CATCH_ENOMEM;
@@ -124,6 +141,11 @@ static apr_status_t thread_pool_construct(apr_thre
goto CATCH_ENOMEM;
}
APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link);
+ me->dead_thds = apr_palloc(me->pool, sizeof(*me->dead_thds));
+ if (!me->dead_thds) {
+ goto CATCH_ENOMEM;
+ }
+ APR_RING_INIT(me->dead_thds, apr_thread_list_elt, link);
me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds));
if (!me->recycled_thds) {
goto CATCH_ENOMEM;
@@ -139,8 +161,10 @@ static apr_status_t thread_pool_construct(apr_thre
goto FINAL_EXIT;
CATCH_ENOMEM:
rv = APR_ENOMEM;
+ apr_thread_cond_destroy(me->all_done);
+ apr_thread_cond_destroy(me->work_done);
+ apr_thread_cond_destroy(me->more_work);
apr_thread_mutex_destroy(me->lock);
- apr_thread_cond_destroy(me->cond);
FINAL_EXIT:
return rv;
}
@@ -231,9 +255,9 @@ static struct apr_thread_list_elt *elt_new(apr_thr
* The worker thread function. Take a task from the queue and perform it if
* there is any. Otherwise, put itself into the idle thread list and waiting
* for signal to wake up.
- * The thread terminate directly by detach and exit when it is asked to stop
- * after finishing a task. Otherwise, the thread should be in idle thread list
- * and should be joined.
+ * The thread terminates directly and exits when it is asked to stop after
+ * finishing a task. The thread should be in the idle thread list and it
+ * should be joined.
*/
static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param)
{
@@ -250,7 +274,7 @@ static void *APR_THREAD_FUNC thread_pool_func(apr_
apr_thread_exit(t, APR_ENOMEM);
}
- while (!me->terminated && elt->state != TH_STOP) {
+ for (;;) {
/* Test if not new element, it is awakened from idle */
if (APR_RING_NEXT(elt, link) != elt) {
--me->idle_cnt;
@@ -258,41 +282,42 @@ static void *APR_THREAD_FUNC thread_pool_func(apr_
}
APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link);
- task = pop_task(me);
- while (NULL != task && !me->terminated) {
+ while (!me->terminated && elt->state != TH_STOP) {
+ task = pop_task(me);
+ if (!task) {
+ break;
+ }
++me->tasks_run;
elt->current_owner = task->owner;
apr_thread_mutex_unlock(me->lock);
+
apr_thread_data_set(task, "apr_thread_pool_task", NULL, t);
task->func(t, task->param);
+
apr_thread_mutex_lock(me->lock);
apr_pool_owner_set(me->pool, 0);
APR_RING_INSERT_TAIL(me->recycled_tasks, task,
apr_thread_pool_task, link);
elt->current_owner = NULL;
- if (TH_STOP == elt->state) {
- break;
+ if (me->waiting_work_done) {
+ apr_thread_cond_signal(me->work_done);
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_mutex_lock(me->lock);
}
- task = pop_task(me);
}
assert(NULL == elt->current_owner);
- if (TH_STOP != elt->state)
- APR_RING_REMOVE(elt, link);
+ APR_RING_REMOVE(elt, link);
- /* Test if a busy thread been asked to stop, which is not joinable */
+ /* Test if a busy thread been asked to stop */
if ((me->idle_cnt >= me->idle_max
&& !(me->scheduled_task_cnt && 0 >= me->idle_max)
&& !me->idle_wait)
|| me->terminated || elt->state != TH_RUN) {
- --me->thd_cnt;
if ((TH_PROBATION == elt->state) && me->idle_wait)
++me->thd_timed_out;
- APR_RING_INSERT_TAIL(me->recycled_thds, elt,
+ APR_RING_INSERT_TAIL(me->dead_thds, elt,
apr_thread_list_elt, link);
- apr_thread_mutex_unlock(me->lock);
- apr_thread_detach(t);
- apr_thread_exit(t, APR_SUCCESS);
- return NULL; /* should not be here, safe net */
+ break;
}
/* busy thread become idle */
@@ -314,32 +339,56 @@ static void *APR_THREAD_FUNC thread_pool_func(apr_
wait = -1;
if (wait >= 0) {
- apr_thread_cond_timedwait(me->cond, me->lock, wait);
+ apr_thread_cond_timedwait(me->more_work, me->lock, wait);
}
else {
- apr_thread_cond_wait(me->cond, me->lock);
+ apr_thread_cond_wait(me->more_work, me->lock);
}
}
/* idle thread been asked to stop, will be joined */
- --me->thd_cnt;
+ if (--me->thd_cnt == 0 && me->terminated) {
+ apr_thread_cond_signal(me->all_done);
+ }
apr_thread_mutex_unlock(me->lock);
apr_thread_exit(t, APR_SUCCESS);
return NULL; /* should not be here, safe net */
}
-static apr_status_t thread_pool_cleanup(void *me)
+/* Must be locked by the caller */
+static void join_deads(apr_thread_pool_t *me)
{
- apr_thread_pool_t *_myself = me;
+ struct apr_thread_list_elt *elt;
+ apr_status_t rv;
- _myself->terminated = 1;
- apr_thread_pool_idle_max_set(_myself, 0);
- while (_myself->thd_cnt) {
- apr_sleep(20 * 1000); /* spin lock with 20 ms */
+ while (!APR_RING_EMPTY(me->dead_thds, apr_thread_list_elt, link)) {
+ elt = APR_RING_FIRST(me->dead_thds);
+ apr_thread_join(&rv, elt->thd);
+
+ APR_RING_REMOVE(elt, link);
+ APR_RING_INSERT_TAIL(me->recycled_thds, elt, apr_thread_list_elt,
+ link);
}
- apr_pool_owner_set(_myself->pool, 0);
- apr_thread_mutex_destroy(_myself->lock);
- apr_thread_cond_destroy(_myself->cond);
+}
+
+static apr_status_t thread_pool_cleanup(void *data)
+{
+ apr_thread_pool_t *me = data;
+
+ me->terminated = 1;
+ apr_thread_pool_tasks_cancel(me, NULL);
+ apr_thread_pool_thread_max_set(me, 0);
+ apr_thread_pool_idle_max_set(me, 0);
+
+ apr_thread_mutex_lock(me->lock);
+ if (me->thd_cnt) {
+ apr_thread_cond_wait(me->all_done, me->lock);
+ }
+ /* All threads should be dead now, join them */
+ apr_pool_owner_set(me->pool, 0);
+ join_deads(me);
+ apr_thread_mutex_unlock(me->lock);
+
return APR_SUCCESS;
}
@@ -488,6 +537,7 @@ static apr_status_t schedule_task(apr_thread_pool_
apr_thread_pool_task_t *t_loc;
apr_thread_t *thd;
apr_status_t rv = APR_SUCCESS;
+
apr_thread_mutex_lock(me->lock);
apr_pool_owner_set(me->pool, 0);
@@ -525,8 +575,9 @@ static apr_status_t schedule_task(apr_thread_pool_
me->thd_high = me->thd_cnt;
}
}
- apr_thread_cond_signal(me->cond);
+ apr_thread_cond_signal(me->more_work);
apr_thread_mutex_unlock(me->lock);
+
return rv;
}
@@ -580,7 +631,7 @@ static apr_status_t add_task(apr_thread_pool_t *me
}
}
- apr_thread_cond_signal(me->cond);
+ apr_thread_cond_signal(me->more_work);
apr_thread_mutex_unlock(me->lock);
return rv;
@@ -625,7 +676,7 @@ static apr_status_t remove_scheduled_tasks(apr_thr
link)) {
next = APR_RING_NEXT(t_loc, link);
/* if this is the owner remove it */
- if (t_loc->owner == owner) {
+ if (!owner || t_loc->owner == owner) {
--me->scheduled_task_cnt;
APR_RING_REMOVE(t_loc, link);
}
@@ -643,7 +694,7 @@ static apr_status_t remove_tasks(apr_thread_pool_t
t_loc = APR_RING_FIRST(me->tasks);
while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) {
next = APR_RING_NEXT(t_loc, link);
- if (t_loc->owner == owner) {
+ if (!owner || t_loc->owner == owner) {
--me->task_cnt;
seg = TASK_PRIORITY_SEG(t_loc);
if (t_loc == me->task_idx[seg]) {
@@ -671,7 +722,7 @@ static void wait_on_busy_threads(apr_thread_pool_t
apr_thread_mutex_lock(me->lock);
elt = APR_RING_FIRST(me->busy_thds);
while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) {
- if (elt->current_owner != owner) {
+ if (!elt->current_owner || (owner && elt->current_owner != owner)) {
elt = APR_RING_NEXT(elt, link);
continue;
}
@@ -685,11 +736,13 @@ static void wait_on_busy_threads(apr_thread_pool_t
assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread));
#endif
#endif
- while (elt->current_owner == owner) {
- apr_thread_mutex_unlock(me->lock);
- apr_sleep(200 * 1000);
- apr_thread_mutex_lock(me->lock);
- }
+ do {
+ me->waiting_work_done = 1;
+ apr_thread_cond_wait(me->work_done, me->lock);
+ me->waiting_work_done = 0;
+ } while (elt->current_owner && (!owner || elt->current_owner == owner));
+
+ /* Restart */
elt = APR_RING_FIRST(me->busy_thds);
}
apr_thread_mutex_unlock(me->lock);
@@ -779,11 +832,10 @@ APR_DECLARE(apr_interval_time_t)
/*
* This function stop extra idle threads to the cnt.
- * @return the number of threads stopped
+ * @return the list of threads stopped
* NOTE: There could be busy threads become idle during this function
*/
-static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me,
- apr_size_t *cnt, int idle)
+static void trim_threads(apr_thread_pool_t *me, apr_size_t *cnt, int idle)
{
struct apr_thread_list *thds;
apr_size_t n, n_dbg, i;
@@ -791,6 +843,9 @@ APR_DECLARE(apr_interval_time_t)
apr_thread_mutex_lock(me->lock);
apr_pool_owner_set(me->pool, 0);
+
+ join_deads(me);
+
if (idle) {
thds = me->idle_thds;
n = me->idle_cnt;
@@ -802,7 +857,7 @@ APR_DECLARE(apr_interval_time_t)
if (n <= *cnt) {
apr_thread_mutex_unlock(me->lock);
*cnt = 0;
- return NULL;
+ return;
}
n -= *cnt;
@@ -811,10 +866,6 @@ APR_DECLARE(apr_interval_time_t)
head = APR_RING_NEXT(head, link);
}
tail = APR_RING_LAST(thds);
- if (idle) {
- APR_RING_UNSPLICE(head, tail, link);
- me->idle_cnt = *cnt;
- }
n_dbg = 0;
for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
@@ -827,45 +878,19 @@ APR_DECLARE(apr_interval_time_t)
*cnt = n;
apr_thread_mutex_unlock(me->lock);
-
- APR_RING_PREV(head, link) = NULL;
- APR_RING_NEXT(tail, link) = NULL;
- return head;
}
static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt)
{
- apr_size_t n_dbg;
- struct apr_thread_list_elt *elt, *head, *tail;
- apr_status_t rv;
-
- elt = trim_threads(me, &cnt, 1);
-
- apr_thread_mutex_lock(me->lock);
- apr_thread_cond_broadcast(me->cond);
- apr_thread_mutex_unlock(me->lock);
-
- n_dbg = 0;
- if (NULL != (head = elt)) {
- while (elt) {
- tail = elt;
- apr_thread_join(&rv, elt->thd);
- elt = APR_RING_NEXT(elt, link);
- ++n_dbg;
- }
+ trim_threads(me, &cnt, 1);
+ if (cnt) {
apr_thread_mutex_lock(me->lock);
- APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail,
- apr_thread_list_elt, link);
+ apr_thread_cond_broadcast(me->more_work);
apr_thread_mutex_unlock(me->lock);
}
- assert(cnt == n_dbg);
-
return cnt;
}
-/* don't join on busy threads for performance reasons, who knows how long will
- * the task takes to perform
- */
static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt)
{
trim_threads(me, &cnt, 0);
@@ -904,20 +929,22 @@ APR_DECLARE(apr_size_t) apr_thread_pool_thread_max
APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me,
apr_size_t cnt)
{
- apr_size_t n;
+ apr_size_t n, i;
me->thd_max = cnt;
- if (0 == cnt || me->thd_cnt <= cnt) {
+ n = me->thd_cnt;
+ if (n <= cnt) {
return 0;
}
- n = me->thd_cnt - cnt;
- if (n >= me->idle_cnt) {
- trim_busy_threads(me, n - me->idle_cnt);
+ n -= cnt;
+ i = me->idle_cnt;
+ if (n >= i) {
+ trim_busy_threads(me, n - i);
trim_idle_threads(me, 0);
}
else {
- trim_idle_threads(me, me->idle_cnt - n);
+ trim_idle_threads(me, i - n);
}
return n;
}