On Thu, Nov 26, 2020 at 2:35 PM Yann Ylavic <ylavic....@gmail.com> wrote: > > So now I go change apr_threadpool so that it does not detach threads > and joins all of them on cleanup, see attached > "threadpool_no_detach.diff".
Sorry, this was an older patch, latest one here.
Index: util-misc/apr_thread_pool.c =================================================================== --- util-misc/apr_thread_pool.c (revision 1883836) +++ util-misc/apr_thread_pool.c (working copy) @@ -71,9 +71,13 @@ struct apr_thread_pool struct apr_thread_pool_tasks *scheduled_tasks; struct apr_thread_list *busy_thds; struct apr_thread_list *idle_thds; + struct apr_thread_list *dead_thds; + apr_thread_cond_t *more_work; + apr_thread_cond_t *work_done; + apr_thread_cond_t *all_done; apr_thread_mutex_t *lock; - apr_thread_cond_t *cond; volatile int terminated; + int waiting_work_done; struct apr_thread_pool_tasks *recycled_tasks; struct apr_thread_list *recycled_thds; apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS]; @@ -94,11 +98,24 @@ static apr_status_t thread_pool_construct(apr_thre if (APR_SUCCESS != rv) { return rv; } - rv = apr_thread_cond_create(&me->cond, me->pool); + rv = apr_thread_cond_create(&me->more_work, me->pool); if (APR_SUCCESS != rv) { apr_thread_mutex_destroy(me->lock); return rv; } + rv = apr_thread_cond_create(&me->work_done, me->pool); + if (APR_SUCCESS != rv) { + apr_thread_cond_destroy(me->more_work); + apr_thread_mutex_destroy(me->lock); + return rv; + } + rv = apr_thread_cond_create(&me->all_done, me->pool); + if (APR_SUCCESS != rv) { + apr_thread_cond_destroy(me->work_done); + apr_thread_cond_destroy(me->more_work); + apr_thread_mutex_destroy(me->lock); + return rv; + } me->tasks = apr_palloc(me->pool, sizeof(*me->tasks)); if (!me->tasks) { goto CATCH_ENOMEM; @@ -124,6 +141,11 @@ static apr_status_t thread_pool_construct(apr_thre goto CATCH_ENOMEM; } APR_RING_INIT(me->idle_thds, apr_thread_list_elt, link); + me->dead_thds = apr_palloc(me->pool, sizeof(*me->dead_thds)); + if (!me->dead_thds) { + goto CATCH_ENOMEM; + } + APR_RING_INIT(me->dead_thds, apr_thread_list_elt, link); me->recycled_thds = apr_palloc(me->pool, sizeof(*me->recycled_thds)); if (!me->recycled_thds) { goto CATCH_ENOMEM; @@ -139,8 +161,10 @@ static apr_status_t thread_pool_construct(apr_thre goto FINAL_EXIT; CATCH_ENOMEM: rv = APR_ENOMEM; + apr_thread_cond_destroy(me->all_done); + apr_thread_cond_destroy(me->work_done); + apr_thread_cond_destroy(me->more_work); apr_thread_mutex_destroy(me->lock); - apr_thread_cond_destroy(me->cond); FINAL_EXIT: return rv; } @@ -231,9 +255,9 @@ static struct apr_thread_list_elt *elt_new(apr_thr * The worker thread function. Take a task from the queue and perform it if * there is any. Otherwise, put itself into the idle thread list and waiting * for signal to wake up. - * The thread terminate directly by detach and exit when it is asked to stop - * after finishing a task. Otherwise, the thread should be in idle thread list - * and should be joined. + * The thread terminates directly and exits when it is asked to stop after + * finishing a task. The thread should be in the idle thread list and it + * should be joined. */ static void *APR_THREAD_FUNC thread_pool_func(apr_thread_t * t, void *param) { @@ -250,7 +274,7 @@ static void *APR_THREAD_FUNC thread_pool_func(apr_ apr_thread_exit(t, APR_ENOMEM); } - while (!me->terminated && elt->state != TH_STOP) { + for (;;) { /* Test if not new element, it is awakened from idle */ if (APR_RING_NEXT(elt, link) != elt) { --me->idle_cnt; @@ -258,41 +282,42 @@ static void *APR_THREAD_FUNC thread_pool_func(apr_ } APR_RING_INSERT_TAIL(me->busy_thds, elt, apr_thread_list_elt, link); - task = pop_task(me); - while (NULL != task && !me->terminated) { + while (!me->terminated && elt->state != TH_STOP) { + task = pop_task(me); + if (!task) { + break; + } ++me->tasks_run; elt->current_owner = task->owner; apr_thread_mutex_unlock(me->lock); + apr_thread_data_set(task, "apr_thread_pool_task", NULL, t); task->func(t, task->param); + apr_thread_mutex_lock(me->lock); apr_pool_owner_set(me->pool, 0); APR_RING_INSERT_TAIL(me->recycled_tasks, task, apr_thread_pool_task, link); elt->current_owner = NULL; - if (TH_STOP == elt->state) { - break; + if (me->waiting_work_done) { + apr_thread_cond_signal(me->work_done); + apr_thread_mutex_unlock(me->lock); + apr_thread_mutex_lock(me->lock); } - task = pop_task(me); } assert(NULL == elt->current_owner); - if (TH_STOP != elt->state) - APR_RING_REMOVE(elt, link); + APR_RING_REMOVE(elt, link); - /* Test if a busy thread been asked to stop, which is not joinable */ + /* Test if a busy thread been asked to stop */ if ((me->idle_cnt >= me->idle_max && !(me->scheduled_task_cnt && 0 >= me->idle_max) && !me->idle_wait) || me->terminated || elt->state != TH_RUN) { - --me->thd_cnt; if ((TH_PROBATION == elt->state) && me->idle_wait) ++me->thd_timed_out; - APR_RING_INSERT_TAIL(me->recycled_thds, elt, + APR_RING_INSERT_TAIL(me->dead_thds, elt, apr_thread_list_elt, link); - apr_thread_mutex_unlock(me->lock); - apr_thread_detach(t); - apr_thread_exit(t, APR_SUCCESS); - return NULL; /* should not be here, safe net */ + break; } /* busy thread become idle */ @@ -314,32 +339,56 @@ static void *APR_THREAD_FUNC thread_pool_func(apr_ wait = -1; if (wait >= 0) { - apr_thread_cond_timedwait(me->cond, me->lock, wait); + apr_thread_cond_timedwait(me->more_work, me->lock, wait); } else { - apr_thread_cond_wait(me->cond, me->lock); + apr_thread_cond_wait(me->more_work, me->lock); } } /* idle thread been asked to stop, will be joined */ - --me->thd_cnt; + if (--me->thd_cnt == 0 && me->terminated) { + apr_thread_cond_signal(me->all_done); + } apr_thread_mutex_unlock(me->lock); apr_thread_exit(t, APR_SUCCESS); return NULL; /* should not be here, safe net */ } -static apr_status_t thread_pool_cleanup(void *me) +/* Must be locked by the caller */ +static void join_deads(apr_thread_pool_t *me) { - apr_thread_pool_t *_myself = me; + struct apr_thread_list_elt *elt; + apr_status_t rv; - _myself->terminated = 1; - apr_thread_pool_idle_max_set(_myself, 0); - while (_myself->thd_cnt) { - apr_sleep(20 * 1000); /* spin lock with 20 ms */ + while (!APR_RING_EMPTY(me->dead_thds, apr_thread_list_elt, link)) { + elt = APR_RING_FIRST(me->dead_thds); + apr_thread_join(&rv, elt->thd); + + APR_RING_REMOVE(elt, link); + APR_RING_INSERT_TAIL(me->recycled_thds, elt, apr_thread_list_elt, + link); } - apr_pool_owner_set(_myself->pool, 0); - apr_thread_mutex_destroy(_myself->lock); - apr_thread_cond_destroy(_myself->cond); +} + +static apr_status_t thread_pool_cleanup(void *data) +{ + apr_thread_pool_t *me = data; + + me->terminated = 1; + apr_thread_pool_tasks_cancel(me, NULL); + apr_thread_pool_thread_max_set(me, 0); + apr_thread_pool_idle_max_set(me, 0); + + apr_thread_mutex_lock(me->lock); + if (me->thd_cnt) { + apr_thread_cond_wait(me->all_done, me->lock); + } + /* All threads should be dead now, join them */ + apr_pool_owner_set(me->pool, 0); + join_deads(me); + apr_thread_mutex_unlock(me->lock); + return APR_SUCCESS; } @@ -488,6 +537,7 @@ static apr_status_t schedule_task(apr_thread_pool_ apr_thread_pool_task_t *t_loc; apr_thread_t *thd; apr_status_t rv = APR_SUCCESS; + apr_thread_mutex_lock(me->lock); apr_pool_owner_set(me->pool, 0); @@ -525,8 +575,9 @@ static apr_status_t schedule_task(apr_thread_pool_ me->thd_high = me->thd_cnt; } } - apr_thread_cond_signal(me->cond); + apr_thread_cond_signal(me->more_work); apr_thread_mutex_unlock(me->lock); + return rv; } @@ -580,7 +631,7 @@ static apr_status_t add_task(apr_thread_pool_t *me } } - apr_thread_cond_signal(me->cond); + apr_thread_cond_signal(me->more_work); apr_thread_mutex_unlock(me->lock); return rv; @@ -625,7 +676,7 @@ static apr_status_t remove_scheduled_tasks(apr_thr link)) { next = APR_RING_NEXT(t_loc, link); /* if this is the owner remove it */ - if (t_loc->owner == owner) { + if (!owner || t_loc->owner == owner) { --me->scheduled_task_cnt; APR_RING_REMOVE(t_loc, link); } @@ -643,7 +694,7 @@ static apr_status_t remove_tasks(apr_thread_pool_t t_loc = APR_RING_FIRST(me->tasks); while (t_loc != APR_RING_SENTINEL(me->tasks, apr_thread_pool_task, link)) { next = APR_RING_NEXT(t_loc, link); - if (t_loc->owner == owner) { + if (!owner || t_loc->owner == owner) { --me->task_cnt; seg = TASK_PRIORITY_SEG(t_loc); if (t_loc == me->task_idx[seg]) { @@ -671,7 +722,7 @@ static void wait_on_busy_threads(apr_thread_pool_t apr_thread_mutex_lock(me->lock); elt = APR_RING_FIRST(me->busy_thds); while (elt != APR_RING_SENTINEL(me->busy_thds, apr_thread_list_elt, link)) { - if (elt->current_owner != owner) { + if (!elt->current_owner || (owner && elt->current_owner != owner)) { elt = APR_RING_NEXT(elt, link); continue; } @@ -685,11 +736,13 @@ static void wait_on_busy_threads(apr_thread_pool_t assert(!apr_os_thread_equal(apr_os_thread_current(), *os_thread)); #endif #endif - while (elt->current_owner == owner) { - apr_thread_mutex_unlock(me->lock); - apr_sleep(200 * 1000); - apr_thread_mutex_lock(me->lock); - } + do { + me->waiting_work_done = 1; + apr_thread_cond_wait(me->work_done, me->lock); + me->waiting_work_done = 0; + } while (elt->current_owner && (!owner || elt->current_owner == owner)); + + /* Restart */ elt = APR_RING_FIRST(me->busy_thds); } apr_thread_mutex_unlock(me->lock); @@ -779,11 +832,10 @@ APR_DECLARE(apr_interval_time_t) /* * This function stop extra idle threads to the cnt. - * @return the number of threads stopped + * @return the list of threads stopped * NOTE: There could be busy threads become idle during this function */ -static struct apr_thread_list_elt *trim_threads(apr_thread_pool_t *me, - apr_size_t *cnt, int idle) +static void trim_threads(apr_thread_pool_t *me, apr_size_t *cnt, int idle) { struct apr_thread_list *thds; apr_size_t n, n_dbg, i; @@ -791,6 +843,9 @@ APR_DECLARE(apr_interval_time_t) apr_thread_mutex_lock(me->lock); apr_pool_owner_set(me->pool, 0); + + join_deads(me); + if (idle) { thds = me->idle_thds; n = me->idle_cnt; @@ -802,7 +857,7 @@ APR_DECLARE(apr_interval_time_t) if (n <= *cnt) { apr_thread_mutex_unlock(me->lock); *cnt = 0; - return NULL; + return; } n -= *cnt; @@ -811,10 +866,6 @@ APR_DECLARE(apr_interval_time_t) head = APR_RING_NEXT(head, link); } tail = APR_RING_LAST(thds); - if (idle) { - APR_RING_UNSPLICE(head, tail, link); - me->idle_cnt = *cnt; - } n_dbg = 0; for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) { @@ -827,45 +878,19 @@ APR_DECLARE(apr_interval_time_t) *cnt = n; apr_thread_mutex_unlock(me->lock); - - APR_RING_PREV(head, link) = NULL; - APR_RING_NEXT(tail, link) = NULL; - return head; } static apr_size_t trim_idle_threads(apr_thread_pool_t *me, apr_size_t cnt) { - apr_size_t n_dbg; - struct apr_thread_list_elt *elt, *head, *tail; - apr_status_t rv; - - elt = trim_threads(me, &cnt, 1); - - apr_thread_mutex_lock(me->lock); - apr_thread_cond_broadcast(me->cond); - apr_thread_mutex_unlock(me->lock); - - n_dbg = 0; - if (NULL != (head = elt)) { - while (elt) { - tail = elt; - apr_thread_join(&rv, elt->thd); - elt = APR_RING_NEXT(elt, link); - ++n_dbg; - } + trim_threads(me, &cnt, 1); + if (cnt) { apr_thread_mutex_lock(me->lock); - APR_RING_SPLICE_TAIL(me->recycled_thds, head, tail, - apr_thread_list_elt, link); + apr_thread_cond_broadcast(me->more_work); apr_thread_mutex_unlock(me->lock); } - assert(cnt == n_dbg); - return cnt; } -/* don't join on busy threads for performance reasons, who knows how long will - * the task takes to perform - */ static apr_size_t trim_busy_threads(apr_thread_pool_t *me, apr_size_t cnt) { trim_threads(me, &cnt, 0); @@ -904,20 +929,22 @@ APR_DECLARE(apr_size_t) apr_thread_pool_thread_max APR_DECLARE(apr_size_t) apr_thread_pool_thread_max_set(apr_thread_pool_t *me, apr_size_t cnt) { - apr_size_t n; + apr_size_t n, i; me->thd_max = cnt; - if (0 == cnt || me->thd_cnt <= cnt) { + n = me->thd_cnt; + if (n <= cnt) { return 0; } - n = me->thd_cnt - cnt; - if (n >= me->idle_cnt) { - trim_busy_threads(me, n - me->idle_cnt); + n -= cnt; + i = me->idle_cnt; + if (n >= i) { + trim_busy_threads(me, n - i); trim_idle_threads(me, 0); } else { - trim_idle_threads(me, me->idle_cnt - n); + trim_idle_threads(me, i - n); } return n; }