The branch, master has been updated via 10fc65b s3:util_sec: add a cache to set_thread_credentials() via 0d2eeb9 lib/util: rename USE_LINUX_THREAD_CREDENTIALS to HAVE_LINUX_THREAD_CREDENTIALS via fb6b6cf pthreadpool: test cancelling and freeing jobs of a wrapped pthreadpool_tevent via f9745d8 pthreadpool: implement pthreadpool_tevent_wrapper_create() infrastructure via 3c4cdb2 pthreadpool: add pthreadpool_restart_check[_monitor_{fd,drain}]() via fbafdc9 pthreadpool: add tests for pthreadpool_tevent_[current_job_]per_thread_cwd() via 12a45ee pthreadpool: add pthreadpool_tevent_[current_job_]per_thread_cwd() via 65e4742 pthreadpool: call unshare(CLONE_FS) if available via 7cb2723 configure: check for Linux specific unshare() with CLONE_FS via 40d1526 pthreadpool: test cancelling and freeing pending pthreadpool_tevent jobs/pools via f23cac3 pthreadpool: add a comment about a further optimization in pthreadpool_tevent_job_destructor() via aa9b64e pthreadpool: maintain a list of job_states on each pthreadpool_tevent_glue via 9b73fda pthreadpool: add helgrind magic to PTHREAD_TEVENT_JOB_THREAD_FENCE_*() via 66aaa22f lib/replace: also check for valgrind/helgrind.h via 30c97da s3:wscript: don't check for valgrind related headers twice via 9656b8d pthreadpool: add some lockless coordination between the main and job threads via 5fa5764 replace: add checks for atomic_thread_fence(memory_order_seq_cst) and add possible fallbacks via 6f8c1b6 third_party/*_wrapper/wscript: remove redundant configure checks via 2ebb584 lib/replace: check for __thread support via c51cae5 pthreadpool: replace assert_return_code(ret, 0); with assert_int_equal(ret, 0); via 617d9c8 pthreadpool: test pthreadpool_tevent_max_threads() returns the expected result via 26b35cb pthreadpool: make sure a pthreadpool is marked as stopped in child processes via 9d31bbf tevent: use talloc_zero_size() for the private state in tevent_context_wrapper_create() from c390728 samba-tool trust: support discovery via netr_GetDcName
https://git.samba.org/?p=samba.git;a=shortlog;h=master - Log ----------------------------------------------------------------- commit 10fc65b74d7b94e0e6454c704b63c138d1d54bd1 Author: Stefan Metzmacher <me...@samba.org> Date: Thu May 3 16:08:06 2018 +0200 s3:util_sec: add a cache to set_thread_credentials() Calling set_thread_credentials() with the same values, skips syscalls the 2nd time. We only do this if '__thread' is supported to provide thread local storage. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> Autobuild-User(master): Ralph Böhme <s...@samba.org> Autobuild-Date(master): Tue Jul 24 20:35:17 CEST 2018 on sn-devel-144 commit 0d2eeb9422bf3fb3097637c63d9e7c8bd20417af Author: Ralph Boehme <s...@samba.org> Date: Thu Jun 28 14:28:34 2018 +0200 lib/util: rename USE_LINUX_THREAD_CREDENTIALS to HAVE_LINUX_THREAD_CREDENTIALS The define reflects the results of a feature test, not a configure option. Signed-off-by: Ralph Boehme <s...@samba.org> Reviewed-by: Stefan Metzmacher <me...@samba.org> commit fb6b6cf3e43165ced4b1039f2683d19f277c0792 Author: Ralph Boehme <s...@samba.org> Date: Mon Jun 18 16:57:18 2018 +0200 pthreadpool: test cancelling and freeing jobs of a wrapped pthreadpool_tevent Pair-Programmed-With: Stefan Metzmacher <me...@samba.org> Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit f9745d8b5234091c38e93ed57a255120b61f3ad7 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Apr 20 17:12:07 2018 +0200 pthreadpool: implement pthreadpool_tevent_wrapper_create() infrastructure This can be used implement a generic per thread impersonation for thread pools. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 3c4cdb290723432b00ff9ff88b892cb4e66e76cd Author: Stefan Metzmacher <me...@samba.org> Date: Mon Jul 16 14:43:01 2018 +0200 pthreadpool: add pthreadpool_restart_check[_monitor_{fd,drain}]() This makes it possible to monitor the pthreadpool for exited worker threads and may restart new threads from the main thread again. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit fbafdc99ef2cef11a1a28e795ffe965cb53ef7fa Author: Stefan Metzmacher <me...@samba.org> Date: Wed Jul 18 10:21:22 2018 +0200 pthreadpool: add tests for pthreadpool_tevent_[current_job_]per_thread_cwd() Note this currently this doesn't enforce the support for unshare(CLONE_FS) as some contraint container environment (e.g. docker) reject the whole unshare() system call. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 12a45ee1a66379ba7562729b835ce0e2e4bfb3b3 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 01:02:41 2018 +0200 pthreadpool: add pthreadpool_tevent_[current_job_]per_thread_cwd() This can be used to check if worker threads run with unshare(CLONE_FS). Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 65e4742d168454df6507d9e74993749063435dd6 Author: Ralph Boehme <s...@samba.org> Date: Tue Mar 13 16:59:32 2018 +0100 pthreadpool: call unshare(CLONE_FS) if available This paves the way for pthreadpool jobs that are path based. Callers can use pthreadpool_per_thread_cwd() to check if the current pool supports it. Pair-Programmed-With: Stefan Metzmacher <me...@samba.org> Signed-off-by: Stefan Metzmacher <me...@samba.org> Signed-off-by: Ralph Boehme <s...@samba.org> commit 7cb27238fe4e6dfc45cf3128ccd4a392aaf4e877 Author: Ralph Boehme <s...@samba.org> Date: Tue Mar 13 16:58:49 2018 +0100 configure: check for Linux specific unshare() with CLONE_FS Note we still need some kind of runtime detection as it can fail in some constraint container setups, which reject the whole unshare() syscall instead of just the once used for container features. In case unshare(CLONE_FS) works, we can have a per thread current working directory and use [f]chdir() safely in worker threads. Pair-Programmed-With: Stefan Metzmacher <me...@samba.org> Signed-off-by: Stefan Metzmacher <me...@samba.org> Signed-off-by: Ralph Boehme <s...@samba.org> commit 40d15260d24d0071732f47873f395fce29b8a6f4 Author: Ralph Boehme <s...@samba.org> Date: Mon Jun 18 15:32:30 2018 +0200 pthreadpool: test cancelling and freeing pending pthreadpool_tevent jobs/pools Pair-Programmed-With: Stefan Metzmacher <me...@samba.org> Signed-off-by: Ralph Boehme <s...@samba.org> Signed-off-by: Stefan Metzmacher <me...@samba.org> commit f23cac39b36b026650e0922c78fe0fd3fe567e35 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 17:22:10 2018 +0200 pthreadpool: add a comment about a further optimization in pthreadpool_tevent_job_destructor() This seems to be a really rare race, it's likely that the immediate event will still trigger and cleanup. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit aa9b64eccfd037941512bad108c4e3946714a502 Author: Stefan Metzmacher <me...@samba.org> Date: Fri Jun 22 17:14:31 2018 +0200 pthreadpool: maintain a list of job_states on each pthreadpool_tevent_glue We should avoid traversing a linked list within a thread without holding a mutex! Using a mutex would be very tricky as we'll likely deadlock with the mutexes at the raw pthreadpool layer. So we use somekind of spinlock using atomic_thread_fence in order to protect the access to job->state->glue->{tctx,ev} in pthreadpool_tevent_job_signal(). Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 9b73fda926eb8493e80012794483039be66d4e6c Author: Stefan Metzmacher <me...@samba.org> Date: Thu Jun 21 12:43:08 2018 +0200 pthreadpool: add helgrind magic to PTHREAD_TEVENT_JOB_THREAD_FENCE_*() This avoids the expected helgrind/drd warnings on the job states which are protected by the thread fence. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 66aaa22f0f6746d2ef9a719adfdad70600287c9d Author: Stefan Metzmacher <me...@samba.org> Date: Thu Jun 21 12:46:48 2018 +0200 lib/replace: also check for valgrind/helgrind.h This will be used in lib/pthreadpool/pthreadpool_tevent.c in order to avoid extected helgrind/drd warnings. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 30c97da7a7ff96777f6fdfc13a791b6893cf8c48 Author: Stefan Metzmacher <me...@samba.org> Date: Thu Jun 21 12:46:06 2018 +0200 s3:wscript: don't check for valgrind related headers twice We already check them in lib/replace/wscript. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 9656b8d8ee11ee351870286f16ea8fbe49112292 Author: Stefan Metzmacher <me...@samba.org> Date: Wed Jun 20 13:38:19 2018 +0200 pthreadpool: add some lockless coordination between the main and job threads In the direction from the main process to the job thread, we have: - 'maycancel', which is set when tevent_req_cancel() is called, - 'orphaned' is the job request, tevent_context or pthreadpool_tevent was talloc_free'ed. The job function can consume these by using: /* * return true - if tevent_req_cancel() was called. */ bool pthreadpool_tevent_current_job_canceled(void); /* * return true - if talloc_free() was called on the job request, * tevent_context or pthreadpool_tevent. */ bool pthreadpool_tevent_current_job_orphaned(void); /* * return true if canceled and orphaned are both false. */ bool pthreadpool_tevent_current_job_continue(void); In the other direction we remember the following points in the job execution: - 'started' - set when the job is picked up by a worker thread - 'executed' - set once the job function returned. - 'finished' - set when pthreadpool_tevent_job_signal() is entered - 'dropped' - set when pthreadpool_tevent_job_signal() leaves with orphaned - 'signaled' - set when pthreadpool_tevent_job_signal() leaves normal There're only one side writing each element, either the main process or the job thread. This means we can do the coordination with a full memory barrier using atomic_thread_fence(memory_order_seq_cst). lib/replace provides fallbacks if C11 stdatomic.h is not available. A real pthreadpool requires pthread and atomic_thread_fence() (or an replacement) to be available, otherwise we only have pthreadpool_sync.c. But this should not make a real difference, as at least __sync_synchronize() is availabe since 2005 in gcc. We also require __thread which is available since 2002. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 5fa5764f30c47b46f12ceb7637985e8def0190ca Author: Stefan Metzmacher <me...@samba.org> Date: Thu Jun 21 14:17:35 2018 +0200 replace: add checks for atomic_thread_fence(memory_order_seq_cst) and add possible fallbacks This implements a full memory barrier. On ubuntu amd64 with results in an 'mfence' instruction. This is required to syncronization between threads, where there's typically only one write of a memory that should be synced between all threads with the barrier. Much more details can be found here: https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fatomic-Builtins.html#g_t_005f_005fatomic-Builtins https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fsync-Builtins.html#g_t_005f_005fsync-Builtins The main one we use seems to be in C11 via stdatomic.h, the oldest fallback is __sync_synchronize(), which is available since 2005 in gcc. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 6f8c1b6736875d63c11f8630ecf1c8d3dcd70fc5 Author: Stefan Metzmacher <me...@samba.org> Date: Wed Jul 18 08:54:22 2018 +0200 third_party/*_wrapper/wscript: remove redundant configure checks HAVE___THREAD and HAVE_DESTRUCTOR_ATTRIBUTE are already checked as part of Samba. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 2ebb5847defce888c3177d6564a3d7dcc0ed47e4 Author: Stefan Metzmacher <me...@samba.org> Date: Wed Jul 18 08:44:48 2018 +0200 lib/replace: check for __thread support Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit c51cae583fd119db613c62bd53070d7d53227c56 Author: Stefan Metzmacher <me...@samba.org> Date: Mon Jul 23 23:24:22 2018 +0200 pthreadpool: replace assert_return_code(ret, 0); with assert_int_equal(ret, 0); We need to assert the exact value! Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 617d9c877df4af1ccc5b36f95dea3301e9a46d9d Author: Stefan Metzmacher <me...@samba.org> Date: Wed Jul 18 10:17:51 2018 +0200 pthreadpool: test pthreadpool_tevent_max_threads() returns the expected result Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 26b35cb20c604afa6db19912e3ff43978dbece61 Author: Stefan Metzmacher <me...@samba.org> Date: Mon Jul 16 17:17:59 2018 +0200 pthreadpool: make sure a pthreadpool is marked as stopped in child processes Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> commit 9d31bbf9793ca9d2041eb8769511aca6c9684e94 Author: Stefan Metzmacher <me...@samba.org> Date: Sat Jul 14 10:55:02 2018 +0200 tevent: use talloc_zero_size() for the private state in tevent_context_wrapper_create() This is watch tevent_req_create() uses and what callers of tevent_context_wrapper_create() would therefore also expect. Signed-off-by: Stefan Metzmacher <me...@samba.org> Reviewed-by: Ralph Boehme <s...@samba.org> ----------------------------------------------------------------------- Summary of changes: lib/pthreadpool/pthreadpool.c | 316 +++++++++ lib/pthreadpool/pthreadpool.h | 81 +++ lib/pthreadpool/pthreadpool_sync.c | 25 + lib/pthreadpool/pthreadpool_tevent.c | 793 ++++++++++++++++++++++- lib/pthreadpool/pthreadpool_tevent.h | 53 ++ lib/pthreadpool/tests_cmocka.c | 1167 +++++++++++++++++++++++++++++++++- lib/replace/system/threads.h | 27 + lib/replace/wscript | 34 +- lib/tevent/tevent_wrapper.c | 2 +- lib/util/setid.c | 24 +- source3/lib/util_sec.c | 47 +- source3/modules/vfs_aio_pthread.c | 4 +- source3/wscript | 15 +- tests/summary.c | 2 +- third_party/nss_wrapper/wscript | 31 +- third_party/pam_wrapper/wscript | 30 +- third_party/resolv_wrapper/wscript | 30 +- third_party/socket_wrapper/wscript | 30 +- third_party/uid_wrapper/wscript | 12 +- wscript | 12 +- 20 files changed, 2544 insertions(+), 191 deletions(-) Changeset truncated at 500 lines: diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c index 610cfb0..db3837c 100644 --- a/lib/pthreadpool/pthreadpool.c +++ b/lib/pthreadpool/pthreadpool.c @@ -23,6 +23,7 @@ #include "system/threads.h" #include "pthreadpool.h" #include "lib/util/dlinklist.h" +#include "lib/util/blocking.h" #ifdef NDEBUG #undef NDEBUG @@ -52,6 +53,8 @@ struct pthreadpool { */ pthread_cond_t condvar; + int check_pipefd[2]; + /* * Array of jobs */ @@ -112,10 +115,13 @@ struct pthreadpool { * where the forking thread will unlock it again. */ pthread_mutex_t fork_mutex; + + bool per_thread_cwd; }; static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER; static struct pthreadpool *pthreadpools = NULL; +static bool pthreadpool_support_thread_cwd = false; static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT; static void pthreadpool_prep_atfork(void); @@ -133,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, { struct pthreadpool *pool; int ret; + bool ok; pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool)); if (pool == NULL) { @@ -150,10 +157,52 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, return ENOMEM; } + ret = pipe(pool->check_pipefd); + if (ret != 0) { + free(pool->jobs); + free(pool); + return ENOMEM; + } + + ok = smb_set_close_on_exec(pool->check_pipefd[0]); + if (!ok) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); + free(pool->jobs); + free(pool); + return EINVAL; + } + ok = smb_set_close_on_exec(pool->check_pipefd[1]); + if (!ok) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); + free(pool->jobs); + free(pool); + return EINVAL; + } + ret = set_blocking(pool->check_pipefd[0], true); + if (ret == -1) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); + free(pool->jobs); + free(pool); + return EINVAL; + } + ret = set_blocking(pool->check_pipefd[1], false); + if (ret == -1) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); + free(pool->jobs); + free(pool); + return EINVAL; + } + pool->head = pool->num_jobs = 0; ret = pthread_mutex_init(&pool->mutex, NULL); if (ret != 0) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -162,6 +211,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, ret = pthread_cond_init(&pool->condvar, NULL); if (ret != 0) { pthread_mutex_destroy(&pool->mutex); + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -171,6 +222,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, if (ret != 0) { pthread_cond_destroy(&pool->condvar); pthread_mutex_destroy(&pool->mutex); + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -182,12 +235,19 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, pool->max_threads = max_threads; pool->num_idle = 0; pool->prefork_cond = NULL; + if (max_threads != 0) { + pool->per_thread_cwd = pthreadpool_support_thread_cwd; + } else { + pool->per_thread_cwd = false; + } ret = pthread_mutex_lock(&pthreadpools_mutex); if (ret != 0) { pthread_mutex_destroy(&pool->fork_mutex); pthread_cond_destroy(&pool->condvar); pthread_mutex_destroy(&pool->mutex); + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -241,6 +301,15 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool) return ret; } +bool pthreadpool_per_thread_cwd(struct pthreadpool *pool) +{ + if (pool->stopped) { + return false; + } + + return pool->per_thread_cwd; +} + static void pthreadpool_prepare_pool(struct pthreadpool *pool) { int ret; @@ -341,6 +410,15 @@ static void pthreadpool_child(void) pool->num_idle = 0; pool->head = 0; pool->num_jobs = 0; + pool->stopped = true; + if (pool->check_pipefd[0] != -1) { + close(pool->check_pipefd[0]); + pool->check_pipefd[0] = -1; + } + if (pool->check_pipefd[1] != -1) { + close(pool->check_pipefd[1]); + pool->check_pipefd[1] = -1; + } ret = pthread_cond_init(&pool->condvar, NULL); assert(ret == 0); @@ -358,6 +436,16 @@ static void pthreadpool_child(void) static void pthreadpool_prep_atfork(void) { +#ifdef HAVE_UNSHARE_CLONE_FS + int res; + + /* remember if unshare(CLONE_FS) works. */ + res = unshare(CLONE_FS); + if (res == 0) { + pthreadpool_support_thread_cwd = true; + } +#endif + pthread_atfork(pthreadpool_prepare, pthreadpool_parent, pthreadpool_child); } @@ -393,6 +481,14 @@ static int pthreadpool_free(struct pthreadpool *pool) return ret2; } + if (pool->check_pipefd[0] != -1) { + close(pool->check_pipefd[0]); + pool->check_pipefd[0] = -1; + } + if (pool->check_pipefd[1] != -1) { + close(pool->check_pipefd[1]); + pool->check_pipefd[1] = -1; + } free(pool->jobs); free(pool); @@ -409,6 +505,15 @@ static int pthreadpool_stop_locked(struct pthreadpool *pool) pool->stopped = true; + if (pool->check_pipefd[0] != -1) { + close(pool->check_pipefd[0]); + pool->check_pipefd[0] = -1; + } + if (pool->check_pipefd[1] != -1) { + close(pool->check_pipefd[1]); + pool->check_pipefd[1] = -1; + } + if (pool->num_threads == 0) { return 0; } @@ -493,6 +598,33 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) free_it = (pool->destroyed && (pool->num_threads == 0)); + while (true) { + uint8_t c = 0; + ssize_t nwritten = 0; + + if (pool->check_pipefd[1] == -1) { + break; + } + + nwritten = write(pool->check_pipefd[1], &c, 1); + if (nwritten == -1) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN) { + break; + } +#ifdef EWOULDBLOCK + if (errno == EWOULDBLOCK) { + break; + } +#endif + /* ignore ... */ + } + + break; + } + ret = pthread_mutex_unlock(&pool->mutex); assert(ret == 0); @@ -570,6 +702,13 @@ static void *pthreadpool_server(void *arg) struct pthreadpool *pool = (struct pthreadpool *)arg; int res; +#ifdef HAVE_UNSHARE_CLONE_FS + if (pool->per_thread_cwd) { + res = unshare(CLONE_FS); + assert(res == 0); + } +#endif + res = pthread_mutex_lock(&pool->mutex); if (res != 0) { return NULL; @@ -816,6 +955,183 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, return res; } +int pthreadpool_restart_check(struct pthreadpool *pool) +{ + int res; + int unlock_res; + unsigned possible_threads = 0; + unsigned missing_threads = 0; + + assert(!pool->destroyed); + + res = pthread_mutex_lock(&pool->mutex); + if (res != 0) { + return res; + } + + if (pool->stopped) { + /* + * Protect against the pool being shut down while + * trying to add a job + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return EINVAL; + } + + if (pool->num_jobs == 0) { + /* + * This also handles the pool->max_threads == 0 case as it never + * calls pthreadpool_put_job() + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + if (pool->num_idle > 0) { + /* + * We have idle threads and pending jobs, + * this means we better let all threads + * start and check for pending jobs. + */ + res = pthread_cond_broadcast(&pool->condvar); + assert(res == 0); + } + + if (pool->num_threads < pool->max_threads) { + possible_threads = pool->max_threads - pool->num_threads; + } + + if (pool->num_idle < pool->num_jobs) { + missing_threads = pool->num_jobs - pool->num_idle; + } + + missing_threads = MIN(missing_threads, possible_threads); + + while (missing_threads > 0) { + + res = pthreadpool_create_thread(pool); + if (res != 0) { + break; + } + + missing_threads--; + } + + if (missing_threads == 0) { + /* + * Ok, we recreated all thread we need. + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + if (pool->num_threads != 0) { + /* + * At least one thread is still available, let + * that one run the queued jobs. + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + /* + * There's no thread available to run any pending jobs. + * The caller may want to cancel the jobs and destroy the pool. + * But that's up to the caller. + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + + return res; +} + +int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool) +{ + int fd; + int ret; + bool ok; + + if (pool->stopped) { + errno = EINVAL; + return -1; + } + + if (pool->check_pipefd[0] == -1) { + errno = ENOSYS; + return -1; + } + + fd = dup(pool->check_pipefd[0]); + if (fd == -1) { + return -1; + } + + ok = smb_set_close_on_exec(fd); + if (!ok) { + int saved_errno = errno; + close(fd); + errno = saved_errno; + return -1; + } + + ret = set_blocking(fd, false); + if (ret == -1) { + int saved_errno = errno; + close(fd); + errno = saved_errno; + return -1; + } + + return fd; +} + +int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool) +{ + if (pool->stopped) { + return EINVAL; + } + + if (pool->check_pipefd[0] == -1) { + return ENOSYS; + } + + while (true) { + uint8_t buf[128]; + ssize_t nread; + + nread = read(pool->check_pipefd[0], buf, sizeof(buf)); + if (nread == -1) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN) { + return 0; + } +#ifdef EWOULDBLOCK + if (errno == EWOULDBLOCK) { + return 0; + } +#endif + if (errno == 0) { + errno = INT_MAX; + } + + return errno; + } + + if (nread < sizeof(buf)) { + return 0; + } + } + + abort(); + return INT_MAX; +} + size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) { diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h index b473358..543567c 100644 --- a/lib/pthreadpool/pthreadpool.h +++ b/lib/pthreadpool/pthreadpool.h @@ -72,6 +72,23 @@ size_t pthreadpool_max_threads(struct pthreadpool *pool); size_t pthreadpool_queued_jobs(struct pthreadpool *pool); /** + * @brief Check for per thread current working directory support of pthreadpool + * + * Since Linux kernel 2.6.16, unshare(CLONE_FS) is supported, + * which provides a per thread current working directory + * and allows [f]chdir() within the worker threads. + * + * Note that this doesn't work on some contraint container setups, + * the complete unshare() syscall might be rejected. + * pthreadpool_per_thread_cwd() returns what is available + * at runtime, so the callers should really check this! + * + * @param[in] pool The pool to run the job on + * @return supported: true, otherwise: false + */ +bool pthreadpool_per_thread_cwd(struct pthreadpool *pool); + +/** * @brief Stop a pthreadpool * * Stop a pthreadpool. If jobs are submitted, but not yet active in @@ -128,6 +145,70 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data); /** + * @brief Check if the pthreadpool needs a restart. + * + * This checks if there are enough threads to run the already + * queued jobs. This should be called only the callers signal_fn + * (passed to pthreadpool_init()) returned an error, so + * that the job's worker thread exited. + * + * Typically this is called once the file destriptor + * returned by pthreadpool_restart_check_monitor_fd() + * became readable and pthreadpool_restart_check_monitor_drain() + * returned success. + * + * This function tries to restart the missing threads. + * + * @param[in] pool The pool to run the job on + * @return success: 0, failure: errno + * + * @see pthreadpool_restart_check_monitor_fd + * @see pthreadpool_restart_check_monitor_drain + */ +int pthreadpool_restart_check(struct pthreadpool *pool); + -- Samba Shared Repository