The branch, master has been updated via 3eee52b pthreadpool: allocate glue->tctx on glue as memory context. via 2575642 pthreadpool: maintain a global list of orphaned pthreadpool_tevent_jobs via fa070d9 pthreadpool: make use of pthreadpool_stop() in pthreadpool_tevent_destructor() via 791c051 pthreadpool: add pthreadpool_tevent_job_cancel() via 245d684 pthreadpool: split out pthreadpool_tevent_job from pthreadpool_tevent_job_state via cdbad90 pthreadpool: let pthreadpool_tevent_job_send() fail with an invalid pool via f19552e pthreadpool: split out a pthreadpool_stop() from pthreadpool_destroy() via 5976841 pthreadpool: don't process further jobs when shutting down via 4e54543 pthreadpool: add pthreadpool_cancel_job() via e4dfd3d pthreadpool: add pthreadpool_tevent_max_threads() and pthreadpool_tevent_queued_jobs() via 505d298 pthreadpool: add pthreadpool_max_threads() and pthreadpool_queued_jobs() helpers via 76474a6 pthreadpool: expand test_create() to check unlimited, sync and one thread pool via f1dac71 pthreadpool: fix helgrind error in pthreadpool_free() via c9f54db pthreadpool: use talloc_zero() in tests_cmocka.c setup_pthreadpool_tevent() via e45d33e pthreadpool: use strict sync processing only with max_threads=0 via 03830a3 pthreadpool: consitently use unlock_res for pthread_mutex_unlock() in pthreadpool_add_job() via 65faef9 s3:messages: explicitly use max_thread=unlimited for pthreadpool_tevent_init() in messaging_dgm_init() via 53a9f3c pthreadpool: explicitly use max_thread=unlimited for pthreadpool_tevent_init() tests via 5e723bc pthreadpool: use unsigned for num_idle, num_threads and max_threads via 19e4a08 pthreadpool: correctly handle pthreadpool_tevent_register_ev() failures via c310647 smbd: remove unused change_to_root_user() from brl_timeout_fn() via d0b1f96 smbd: remove unused change_to_root_user() from smbd_sig_hup_handler() via e37e41b smbd: avoid explicit change_to_user() in defer_rename_done() already done by impersonation via 1b804f7 smbd: implement smbd_impersonate_{conn_vuid,conn_sess,root,guest}_create() wrappers via 0dcaa07 smbd: make use of smbd_impersonate_{conn_vuid,conn_sess,root,guest}_create() wrappers via 5285966 smbd: add simple noop smbd_impersonate_{conn_vuid,conn_sess,root,guest}_create() wrappers via 23319ef smbd: add smbd_impersonate_debug_create() helper via 7b5a47b smbd: add [un]become_guest() helper functions from 710ce1c WHATSNEW: Start release notes for Samba 4.10.
https://git.samba.org/?p=samba.git;a=shortlog;h=master - Log ----------------------------------------------------------------- commit 3eee52b44daa8544e1c1fb609f901a3a96b29b25 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 17:11:53 2018 +0200 pthreadpool: allocate glue->tctx on glue as memory context. This means it will go aways together with glue and thte event context. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> Autobuild-User(master): Stefan Metzmacher <me...@samba.org> Autobuild-Date(master): Thu Jul 12 17:18:01 CEST 2018 on sn-devel-144 commit 25756425aaf5465e56ea809cd415b6a387848919 Author: Stefan Metzmacher <me...@samba.org> Date: Wed Jun 20 13:38:19 2018 +0200 pthreadpool: maintain a global list of orphaned pthreadpool_tevent_jobs Instead of leaking the memory forever, we retry the cleanup, if other pthreadpool_tevent_*() functions are used. pthreadpool_tevent_cleanup_orphaned_jobs() could also be called by external callers. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit fa070d90074629cb8262bc4e2a6ceef57a9fbd5c Author: Stefan Metzmacher <me...@samba.org> Date: Wed Apr 25 20:25:21 2018 +0200 pthreadpool: make use of pthreadpool_stop() in pthreadpool_tevent_destructor() Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 791c05144ee9296024cc0fdebe4afeaaf67e26bc Author: Stefan Metzmacher <me...@samba.org> Date: Wed Apr 25 14:43:22 2018 +0200 pthreadpool: add pthreadpool_tevent_job_cancel() Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 245d684d28dab630f3d47ff61006a1fe3e5eeefa Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 01:39:47 2018 +0200 pthreadpool: split out pthreadpool_tevent_job from pthreadpool_tevent_job_state This makes it much easier to handle orphaned jobs, we either wait for the immediate tevent to trigger or we just keep leaking the memory. The next commits will improve this further. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit cdbad9041b8afd3f0436fbeb5d6b50f9f1ada60d Author: Stefan Metzmacher <me...@samba.org> Date: Fri Apr 20 15:07:08 2018 +0200 pthreadpool: let pthreadpool_tevent_job_send() fail with an invalid pool Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit f19552e2390636518dc762bb9dfe25d3407dc521 Author: Stefan Metzmacher <me...@samba.org> Date: Wed Apr 25 14:03:30 2018 +0200 pthreadpool: split out a pthreadpool_stop() from pthreadpool_destroy() This can be used in combination with pthreadpool_cancel_job() to implement a multi step shutdown of the pool. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 59768416148f72d87cba80ae21afbb2861ca9442 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Apr 20 17:12:07 2018 +0200 pthreadpool: don't process further jobs when shutting down Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 4e54543b54ea2bc71f5524a41d359376b8f59125 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Apr 20 15:00:31 2018 +0200 pthreadpool: add pthreadpool_cancel_job() Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit e4dfd3da94c097fa9cd3fea4ad6dfc4af8ca2a45 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 01:01:42 2018 +0200 pthreadpool: add pthreadpool_tevent_max_threads() and pthreadpool_tevent_queued_jobs() These can be used to implement some kind of flow control in the caller. E.g. unless pthreadpool_tevent_queued_jobs() is lower than pthreadpool_tevent_max_threads() is good to prepare new jobs. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 505d298e81570bb118a9b82e5166a8f11139750b Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 00:49:33 2018 +0200 pthreadpool: add pthreadpool_max_threads() and pthreadpool_queued_jobs() helpers These can be used to implement some kind of flow control in the caller. E.g. unless pthreadpool_queued_jobs() is lower than pthreadpool_max_threads() is good to prepare new jobs. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 76474a6fad43c791293f4fb30dc7c155619c5dec Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 08:39:36 2018 +0200 pthreadpool: expand test_create() to check unlimited, sync and one thread pool Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit f1dac71a866857bc70a0477861997cc6174a00d9 Author: Stefan Metzmacher <me...@samba.org> Date: Thu Jun 21 12:40:30 2018 +0200 pthreadpool: fix helgrind error in pthreadpool_free() We need to pthread_mutex_lock/unlock the pool mutex before we can destroy it. The following test would trigger this. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit c9f54db10911762298d1d4fe681d0aaf705f6f55 Author: Stefan Metzmacher <me...@samba.org> Date: Wed Jun 20 15:38:08 2018 +0200 pthreadpool: use talloc_zero() in tests_cmocka.c setup_pthreadpool_tevent() This was found with valgrind. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit e45d33e92e47995faa7c523cf82dfb65efb190f8 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 00:29:53 2018 +0200 pthreadpool: use strict sync processing only with max_threads=0 Otherwise it's an error if not at least one thread is possible. This gives a much saner behaviour and doesn't end up with unexpected sync processing. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 03830a3226d43f13e58dcf68874d50e19793144d Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 00:27:39 2018 +0200 pthreadpool: consitently use unlock_res for pthread_mutex_unlock() in pthreadpool_add_job() This makes further restructuring easier to implement and understand. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 65faef959d93221909d3d4031f685a442935ee90 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 00:10:08 2018 +0200 s3:messages: explicitly use max_thread=unlimited for pthreadpool_tevent_init() in messaging_dgm_init() Currently 0 also means unlimited, but that will change soon, to force no thread and strict sync processing. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 53a9f3cac646c1034a15d2b54531b9be46dde923 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 00:10:08 2018 +0200 pthreadpool: explicitly use max_thread=unlimited for pthreadpool_tevent_init() tests Currently 0 also means unlimited, but that will change soon, to force no thread and strict sync processing. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 5e723bc6f122223252b8d3c67b7b1f17121dd49a Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 00:04:48 2018 +0200 pthreadpool: use unsigned for num_idle, num_threads and max_threads These can't get negative. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 19e4a0857779dd7502d044fc76859bbc202276e2 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Apr 20 15:05:44 2018 +0200 pthreadpool: correctly handle pthreadpool_tevent_register_ev() failures It returns errno values instead of setting 'errno'. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit c310647e560abc2343ef2ff0ab15daab0fba2650 Author: Ralph Boehme <s...@samba.org> Date: Wed May 23 16:28:48 2018 +0200 smbd: remove unused change_to_root_user() from brl_timeout_fn() This is handled by using the root_ev_ctx in order to register the timer event. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit d0b1f96f086bdd613644f64b3f75ee8c4388f674 Author: Stefan Metzmacher <me...@samba.org> Date: Wed Apr 18 14:29:52 2018 +0200 smbd: remove unused change_to_root_user() from smbd_sig_hup_handler() This is handled by using the root_ev_ctx in order to register the signal event. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit e37e41b3cac52e3623f0c79f83733a51edb35c10 Author: Stefan Metzmacher <me...@samba.org> Date: Thu Jun 21 19:20:53 2018 +0200 smbd: avoid explicit change_to_user() in defer_rename_done() already done by impersonation Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 1b804f7ae23f54a6c1004e5ff17b9df1376b5adb Author: Stefan Metzmacher <me...@samba.org> Date: Fri May 11 15:51:42 2012 +0200 smbd: implement smbd_impersonate_{conn_vuid,conn_sess,root,guest}_create() wrappers This makes sure we're doing the correct impersonation for async requests, which is a requirement to start adding path based async SMB_VFS calls. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 0dcaa0707bad67f7bfaa10ccaf167bfefbe87a0c Author: Stefan Metzmacher <me...@samba.org> Date: Thu Mar 22 10:54:41 2018 +0100 smbd: make use of smbd_impersonate_{conn_vuid,conn_sess,root,guest}_create() wrappers For now they just add debugging, but that will change shortly. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 5285966e67cbee8519015df12a15e938e85e6ee7 Author: Stefan Metzmacher <me...@samba.org> Date: Thu Mar 22 10:54:41 2018 +0100 smbd: add simple noop smbd_impersonate_{conn_vuid,conn_sess,root,guest}_create() wrappers As a start these are just wrappers arround smbd_impersonate_debug_create(), without any real impersonation. But this will change shortly. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 23319ef5a2eff4811b685d4ab54179efc49bac99 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Mar 23 07:47:38 2018 +0100 smbd: add smbd_impersonate_debug_create() helper This will be used to implement no-op impersonation for the create_conn_struct_as_root() case were we don't really have other unrelated events in the loop and only need a valid tevent wrapper context to avoid double free on the raw event context on teardown. This also adds useful debugging instead of being a full no-op wrapper. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 7b5a47b84696e8e5c26207bd398742b883e598c2 Author: Stefan Metzmacher <me...@samba.org> Date: Fri May 25 16:22:33 2018 +0200 smbd: add [un]become_guest() helper functions Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> ----------------------------------------------------------------------- Summary of changes: lib/pthreadpool/pthreadpool.c | 245 +++++-- lib/pthreadpool/pthreadpool.h | 73 ++ lib/pthreadpool/pthreadpool_sync.c | 29 + lib/pthreadpool/pthreadpool_tevent.c | 354 ++++++++-- lib/pthreadpool/pthreadpool_tevent.h | 5 + lib/pthreadpool/tests.c | 3 +- lib/pthreadpool/tests_cmocka.c | 86 ++- source3/lib/messages_dgm.c | 2 +- source3/modules/vfs_readonly.c | 2 +- source3/smbd/blocking.c | 3 - source3/smbd/conn.c | 2 +- source3/smbd/msdfs.c | 34 +- source3/smbd/process.c | 19 +- source3/smbd/proto.h | 19 + source3/smbd/smb2_setinfo.c | 11 - source3/smbd/uid.c | 1236 +++++++++++++++++++++++++++++++++- 16 files changed, 1968 insertions(+), 155 deletions(-) Changeset truncated at 500 lines: diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c index 92a88c9..610cfb0 100644 --- a/lib/pthreadpool/pthreadpool.c +++ b/lib/pthreadpool/pthreadpool.c @@ -71,24 +71,32 @@ struct pthreadpool { void *signal_fn_private_data; /* - * indicator to worker threads that they should shut down + * indicator to worker threads to stop processing further jobs + * and exit. */ - bool shutdown; + bool stopped; + + /* + * indicator to the last worker thread to free the pool + * resources. + */ + bool destroyed; /* * maximum number of threads + * 0 means no real thread, only strict sync processing. */ - int max_threads; + unsigned max_threads; /* * Number of threads */ - int num_threads; + unsigned num_threads; /* * Number of idle threads */ - int num_idle; + unsigned num_idle; /* * Condition variable indicating that helper threads should @@ -168,7 +176,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, return ret; } - pool->shutdown = false; + pool->stopped = false; + pool->destroyed = false; pool->num_threads = 0; pool->max_threads = max_threads; pool->num_idle = 0; @@ -195,6 +204,43 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, return 0; } +size_t pthreadpool_max_threads(struct pthreadpool *pool) +{ + if (pool->stopped) { + return 0; + } + + return pool->max_threads; +} + +size_t pthreadpool_queued_jobs(struct pthreadpool *pool) +{ + int res; + int unlock_res; + size_t ret; + + if (pool->stopped) { + return 0; + } + + res = pthread_mutex_lock(&pool->mutex); + if (res != 0) { + return res; + } + + if (pool->stopped) { + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + ret = pool->num_jobs; + + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return ret; +} + static void pthreadpool_prepare_pool(struct pthreadpool *pool) { int ret; @@ -206,7 +252,7 @@ static void pthreadpool_prepare_pool(struct pthreadpool *pool) assert(ret == 0); while (pool->num_idle != 0) { - int num_idle = pool->num_idle; + unsigned num_idle = pool->num_idle; pthread_cond_t prefork_cond; ret = pthread_cond_init(&prefork_cond, NULL); @@ -328,6 +374,11 @@ static int pthreadpool_free(struct pthreadpool *pool) ret = pthread_mutex_unlock(&pthreadpools_mutex); assert(ret == 0); + ret = pthread_mutex_lock(&pool->mutex); + assert(ret == 0); + ret = pthread_mutex_unlock(&pool->mutex); + assert(ret == 0); + ret = pthread_mutex_destroy(&pool->mutex); ret1 = pthread_cond_destroy(&pool->condvar); ret2 = pthread_mutex_destroy(&pool->fork_mutex); @@ -349,11 +400,33 @@ static int pthreadpool_free(struct pthreadpool *pool) } /* - * Destroy a thread pool. Wake up all idle threads for exit. The last - * one will free the pool. + * Stop a thread pool. Wake up all idle threads for exit. */ -int pthreadpool_destroy(struct pthreadpool *pool) +static int pthreadpool_stop_locked(struct pthreadpool *pool) +{ + int ret; + + pool->stopped = true; + + if (pool->num_threads == 0) { + return 0; + } + + /* + * We have active threads, tell them to finish. + */ + + ret = pthread_cond_broadcast(&pool->condvar); + + return ret; +} + +/* + * Stop a thread pool. Wake up all idle threads for exit. + */ + +int pthreadpool_stop(struct pthreadpool *pool) { int ret, ret1; @@ -362,34 +435,50 @@ int pthreadpool_destroy(struct pthreadpool *pool) return ret; } - if (pool->shutdown) { - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); - return EBUSY; + if (!pool->stopped) { + ret = pthreadpool_stop_locked(pool); } - pool->shutdown = true; + ret1 = pthread_mutex_unlock(&pool->mutex); + assert(ret1 == 0); - if (pool->num_threads == 0) { - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); + return ret; +} - ret = pthreadpool_free(pool); +/* + * Destroy a thread pool. Wake up all idle threads for exit. The last + * one will free the pool. + */ + +int pthreadpool_destroy(struct pthreadpool *pool) +{ + int ret, ret1; + bool free_it; + + assert(!pool->destroyed); + + ret = pthread_mutex_lock(&pool->mutex); + if (ret != 0) { return ret; } - /* - * We have active threads, tell them to finish. - */ + pool->destroyed = true; - ret = pthread_cond_broadcast(&pool->condvar); + if (!pool->stopped) { + ret = pthreadpool_stop_locked(pool); + } + + free_it = (pool->num_threads == 0); ret1 = pthread_mutex_unlock(&pool->mutex); assert(ret1 == 0); + if (free_it) { + pthreadpool_free(pool); + } + return ret; } - /* * Prepare for pthread_exit(), pool->mutex must be locked and will be * unlocked here. This is a bit of a layering violation, but here we @@ -402,7 +491,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) pool->num_threads -= 1; - free_it = (pool->shutdown && (pool->num_threads == 0)); + free_it = (pool->destroyed && (pool->num_threads == 0)); ret = pthread_mutex_unlock(&pool->mutex); assert(ret == 0); @@ -415,6 +504,10 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) static bool pthreadpool_get_job(struct pthreadpool *p, struct pthreadpool_job *job) { + if (p->stopped) { + return false; + } + if (p->num_jobs == 0) { return false; } @@ -494,7 +587,7 @@ static void *pthreadpool_server(void *arg) clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += 1; - while ((pool->num_jobs == 0) && !pool->shutdown) { + while ((pool->num_jobs == 0) && !pool->stopped) { pool->num_idle += 1; res = pthread_cond_timedwait( @@ -572,10 +665,9 @@ static void *pthreadpool_server(void *arg) } } - if ((pool->num_jobs == 0) && pool->shutdown) { + if (pool->stopped) { /* - * No more work to do and we're asked to shut down, so - * exit + * we're asked to stop processing jobs, so exit */ pthreadpool_server_exit(pool); return NULL; @@ -632,33 +724,48 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) { int res; + int unlock_res; + + assert(!pool->destroyed); res = pthread_mutex_lock(&pool->mutex); if (res != 0) { return res; } - if (pool->shutdown) { + if (pool->stopped) { /* * Protect against the pool being shut down while * trying to add a job */ - res = pthread_mutex_unlock(&pool->mutex); - assert(res == 0); + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); return EINVAL; } + if (pool->max_threads == 0) { + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + + /* + * If no thread are allowed we do strict sync processing. + */ + fn(private_data); + res = pool->signal_fn(job_id, fn, private_data, + pool->signal_fn_private_data); + return res; + } + /* * Add job to the end of the queue */ if (!pthreadpool_put_job(pool, job_id, fn, private_data)) { - res = pthread_mutex_unlock(&pool->mutex); - assert(res == 0); + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); return ENOMEM; } if (pool->num_idle > 0) { - int unlock_res; /* * We have idle threads, wake one. */ @@ -671,20 +778,19 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, return res; } - if ((pool->max_threads != 0) && - (pool->num_threads >= pool->max_threads)) { + if (pool->num_threads >= pool->max_threads) { /* * No more new threads, we just queue the request */ - res = pthread_mutex_unlock(&pool->mutex); - assert(res == 0); + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); return 0; } res = pthreadpool_create_thread(pool); if (res == 0) { - res = pthread_mutex_unlock(&pool->mutex); - assert(res == 0); + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); return 0; } @@ -693,8 +799,8 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, * At least one thread is still available, let * that one run the queued job. */ - res = pthread_mutex_unlock(&pool->mutex); - assert(res == 0); + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); return 0; } @@ -704,11 +810,56 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, */ pthreadpool_undo_put_job(pool); + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + + return res; +} + +size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, + void (*fn)(void *private_data), void *private_data) +{ + int res; + size_t i, j; + size_t num = 0; + + assert(!pool->destroyed); + + res = pthread_mutex_lock(&pool->mutex); + if (res != 0) { + return res; + } + + for (i = 0, j = 0; i < pool->num_jobs; i++) { + size_t idx = (pool->head + i) % pool->jobs_array_len; + size_t new_idx = (pool->head + j) % pool->jobs_array_len; + struct pthreadpool_job *job = &pool->jobs[idx]; + + if ((job->private_data == private_data) && + (job->id == job_id) && + (job->fn == fn)) + { + /* + * Just skip the entry. + */ + num++; + continue; + } + + /* + * If we already removed one or more jobs (so j will be smaller + * then i), we need to fill possible gaps in the logical list. + */ + if (j < i) { + pool->jobs[new_idx] = *job; + } + j++; + } + + pool->num_jobs -= num; + res = pthread_mutex_unlock(&pool->mutex); assert(res == 0); - fn(private_data); - res = pool->signal_fn(job_id, fn, private_data, - pool->signal_fn_private_data); - return res; + return num; } diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h index defbe5a..b473358 100644 --- a/lib/pthreadpool/pthreadpool.h +++ b/lib/pthreadpool/pthreadpool.h @@ -51,8 +51,51 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, void *signal_fn_private_data); /** + * @brief Get the max threads value of pthreadpool + * + * @note This can be 0 for strict sync processing. + * + * @param[in] pool The pool + * @return number of possible threads + */ +size_t pthreadpool_max_threads(struct pthreadpool *pool); + +/** + * @brief The number of queued jobs of pthreadpool + * + * This is the number of jobs added by pthreadpool_add_job(), + * which are not yet processed by a thread. + * + * @param[in] pool The pool + * @return The number of jobs + */ +size_t pthreadpool_queued_jobs(struct pthreadpool *pool); + +/** + * @brief Stop a pthreadpool + * + * Stop a pthreadpool. If jobs are submitted, but not yet active in + * a thread, they won't get executed. If a job has already been + * submitted to a thread, the job function will continue running, and + * the signal function might still be called. + * + * This allows a multi step shutdown using pthreadpool_stop(), + * pthreadpool_cancel_job() and pthreadpool_destroy(). + * + * @param[in] pool The pool to stop + * @return success: 0, failure: errno + * + * @see pthreadpool_cancel_job() + * @see pthreadpool_destroy() + */ +int pthreadpool_stop(struct pthreadpool *pool); + +/** * @brief Destroy a pthreadpool * + * This basically implies pthreadpool_stop() if the pool + * isn't already stopped. + * * Destroy a pthreadpool. If jobs are submitted, but not yet active in * a thread, they won't get executed. If a job has already been * submitted to a thread, the job function will continue running, and @@ -63,6 +106,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, * * @param[in] pool The pool to destroy * @return success: 0, failure: errno + * + * @see pthreadpool_stop() */ int pthreadpool_destroy(struct pthreadpool *pool); @@ -82,4 +127,32 @@ int pthreadpool_destroy(struct pthreadpool *pool); int pthreadpool_add_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data); +/** + * @brief Try to cancel a job in a pthreadpool + * + * This tries to cancel a job in a pthreadpool. The same + * arguments, which were given to pthreadpool_add_job() + * needs to be passed. + * + * The combination of id, fn, private_data might not be unique. + * So the function tries to cancel as much matching jobs as possible. + * Note once a job is scheduled in a thread it's to late to + * cancel it. + * + * Canceled jobs that weren't started yet won't be reported via a + * pool's signal_fn. + * + * @param[in] pool The pool to run the job on + * @param[in] job_id A custom identifier -- Samba Shared Repository