The branch, master has been updated via f3b69da s3-libsmb: Add a simple test for python bindings via fbebd75 s3-libsmb: Add a python wrapper via d7d8646 tevent: change version to 0.9.17 after adding the "poll_mt" backend via d7af2c8 tevent: Add threaded poll_mt testcase via fa71f32 lib/tevent: Add a thread-safe tevent backend via d860aa2 tevent_poll: Decouple poll_ev->fds handling from adding/removing fds from cbe2510 s3-g_lock: Make g_lock_lock more robust
http://gitweb.samba.org/?p=samba.git;a=shortlog;h=master - Log ----------------------------------------------------------------- commit f3b69da2aeb637398b0670cfb4a29379a8000d91 Author: Volker Lendecke <v...@samba.org> Date: Wed Aug 15 14:08:45 2012 +0200 s3-libsmb: Add a simple test for python bindings Signed-off-by: Stefan Metzmacher <me...@samba.org> Autobuild-User(master): Stefan Metzmacher <me...@samba.org> Autobuild-Date(master): Thu Aug 16 22:49:06 CEST 2012 on sn-devel-104 commit fbebd7530ed365ac6a7c7ae004975850b4f4d162 Author: Volker Lendecke <v...@samba.org> Date: Thu Aug 2 23:21:27 2012 +0200 s3-libsmb: Add a python wrapper Please note that this is not finished and only for internal use. Signed-off-by: Stefan Metzmacher <me...@samba.org> commit d7d8646e6352c3e28012e52bd926283b1d1a09c9 Author: Stefan Metzmacher <me...@samba.org> Date: Thu Aug 16 21:06:45 2012 +0200 tevent: change version to 0.9.17 after adding the "poll_mt" backend metze commit d7af2c842a5db2a1dd9f61b463ccfee24d7db5ed Author: Volker Lendecke <v...@samba.org> Date: Mon Jul 30 09:09:46 2012 +0200 tevent: Add threaded poll_mt testcase Signed-off-by: Stefan Metzmacher <me...@samba.org> commit fa71f3241110fbe079de6d6cb0c8f3df001f4c65 Author: Volker Lendecke <v...@samba.org> Date: Mon Aug 13 16:06:01 2012 +0200 lib/tevent: Add a thread-safe tevent backend Signed-off-by: Stefan Metzmacher <me...@samba.org> commit d860aa2cacc783434973c14f7ae21964ca050e6f Author: Volker Lendecke <v...@samba.org> Date: Sun Jul 29 13:05:36 2012 +0200 tevent_poll: Decouple poll_ev->fds handling from adding/removing fds Step 1 in a python backend for multiple threads Signed-off-by: Stefan Metzmacher <me...@samba.org> ----------------------------------------------------------------------- Summary of changes: .../ABI/{tevent-0.9.16.sigs => tevent-0.9.17.sigs} | 0 lib/tevent/testsuite.c | 145 +++++ lib/tevent/tevent.c | 1 + lib/tevent/tevent_internal.h | 1 + lib/tevent/tevent_poll.c | 364 +++++++++--- lib/tevent/wscript | 2 +- source3/libsmb/pylibsmb.c | 671 ++++++++++++++++++++ source3/smbd/pysmbd.c | 2 +- source3/wscript_build | 6 + .../python/samba/tests/libsmb_samba_internal.py | 78 +++ source4/selftest/tests.py | 2 + 11 files changed, 1197 insertions(+), 75 deletions(-) copy lib/tevent/ABI/{tevent-0.9.16.sigs => tevent-0.9.17.sigs} (100%) create mode 100644 source3/libsmb/pylibsmb.c create mode 100644 source4/scripting/python/samba/tests/libsmb_samba_internal.py Changeset truncated at 500 lines: diff --git a/lib/tevent/ABI/tevent-0.9.16.sigs b/lib/tevent/ABI/tevent-0.9.17.sigs similarity index 100% copy from lib/tevent/ABI/tevent-0.9.16.sigs copy to lib/tevent/ABI/tevent-0.9.17.sigs diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c index 5868f0a..3d2a79a 100644 --- a/lib/tevent/testsuite.c +++ b/lib/tevent/testsuite.c @@ -26,7 +26,12 @@ #include "includes.h" #include "lib/tevent/tevent.h" #include "system/filesys.h" +#include "system/select.h" #include "torture/torture.h" +#ifdef HAVE_PTHREAD +#include <pthread.h> +#include <assert.h> +#endif static int fde_count; @@ -146,6 +151,140 @@ static bool test_event_context(struct torture_context *test, return true; } +#ifdef HAVE_PTHREAD + +static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER; +static bool do_shutdown = false; + +static void test_event_threaded_lock(void) +{ + int ret; + ret = pthread_mutex_lock(&threaded_mutex); + assert(ret == 0); +} + +static void test_event_threaded_unlock(void) +{ + int ret; + ret = pthread_mutex_unlock(&threaded_mutex); + assert(ret == 0); +} + +static void test_event_threaded_trace(enum tevent_trace_point point, + void *private_data) +{ + switch (point) { + case TEVENT_TRACE_BEFORE_WAIT: + test_event_threaded_unlock(); + break; + case TEVENT_TRACE_AFTER_WAIT: + test_event_threaded_lock(); + break; + } +} + +static void test_event_threaded_timer(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval current_time, + void *private_data) +{ + return; +} + +static void *test_event_poll_thread(void *private_data) +{ + struct tevent_context *ev = (struct tevent_context *)private_data; + + test_event_threaded_lock(); + + while (true) { + int ret; + ret = tevent_loop_once(ev); + assert(ret == 0); + if (do_shutdown) { + test_event_threaded_unlock(); + return NULL; + } + } + +} + +static void test_event_threaded_read_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, + void *private_data) +{ + int *pfd = (int *)private_data; + char c; + ssize_t nread; + + if ((flags & TEVENT_FD_READ) == 0) { + return; + } + + do { + nread = read(*pfd, &c, 1); + } while ((nread == -1) && (errno == EINTR)); + + assert(nread == 1); +} + +static bool test_event_context_threaded(struct torture_context *test, + const void *test_data) +{ + struct tevent_context *ev; + struct tevent_timer *te; + struct tevent_fd *fde; + pthread_t poll_thread; + int fds[2]; + int ret; + char c = 0; + + ev = tevent_context_init_byname(test, "poll_mt"); + torture_assert(test, ev != NULL, "poll_mt not supported"); + + tevent_set_trace_callback(ev, test_event_threaded_trace, NULL); + + te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0), + test_event_threaded_timer, NULL); + torture_assert(test, te != NULL, "Could not add timer"); + + ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev); + torture_assert(test, ret == 0, "Could not create poll thread"); + + ret = pipe(fds); + torture_assert(test, ret == 0, "Could not create pipe"); + + poll(NULL, 0, 100); + + test_event_threaded_lock(); + + fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ, + test_event_threaded_read_handler, &fds[0]); + torture_assert(test, fde != NULL, "Could not add fd event"); + + test_event_threaded_unlock(); + + poll(NULL, 0, 100); + + write(fds[1], &c, 1); + + poll(NULL, 0, 100); + + test_event_threaded_lock(); + do_shutdown = true; + test_event_threaded_unlock(); + + write(fds[1], &c, 1); + + ret = pthread_join(poll_thread, NULL); + torture_assert(test, ret == 0, "pthread_join failed"); + + return true; +} + +#endif + struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx) { struct torture_suite *suite = torture_suite_create(mem_ctx, "event"); @@ -158,5 +297,11 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx) (const void *)list[i]); } +#ifdef HAVE_PTHREAD + torture_suite_add_simple_tcase_const(suite, "poll_mt_threaded", + test_event_context_threaded, + NULL); +#endif + return suite; } diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c index 61ffc7e..fa842e4 100644 --- a/lib/tevent/tevent.c +++ b/lib/tevent/tevent.c @@ -114,6 +114,7 @@ static void tevent_backend_init(void) { tevent_select_init(); tevent_poll_init(); + tevent_poll_mt_init(); tevent_standard_init(); #ifdef HAVE_EPOLL tevent_epoll_init(); diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h index 877510f..f09cf57 100644 --- a/lib/tevent/tevent_internal.h +++ b/lib/tevent/tevent_internal.h @@ -315,6 +315,7 @@ void tevent_cleanup_pending_signal_handlers(struct tevent_signal *se); bool tevent_standard_init(void); bool tevent_select_init(void); bool tevent_poll_init(void); +bool tevent_poll_mt_init(void); #ifdef HAVE_EPOLL bool tevent_epoll_init(void); #endif diff --git a/lib/tevent/tevent_poll.c b/lib/tevent/tevent_poll.c index 7ae3c42..da8cc01 100644 --- a/lib/tevent/tevent_poll.c +++ b/lib/tevent/tevent_poll.c @@ -34,11 +34,22 @@ struct poll_event_context { struct tevent_context *ev; /* + * A DLIST for fresh fde's added by poll_event_add_fd but not + * picked up yet by poll_event_loop_once + */ + struct tevent_fd *fresh; + + /* * These two arrays are maintained together. */ struct pollfd *fds; - struct tevent_fd **fd_events; - uint64_t num_fds; + struct tevent_fd **fdes; + unsigned num_fds; + + /* + * Signal fd to wake the poll() thread + */ + int signal_fd; /* information for exiting from the event loop */ int exit_code; @@ -56,18 +67,125 @@ static int poll_event_context_init(struct tevent_context *ev) return -1; } poll_ev->ev = ev; + poll_ev->signal_fd = -1; ev->additional_data = poll_ev; return 0; } +static int poll_event_mt_destructor(struct poll_event_context *poll_ev) +{ + if (poll_ev->signal_fd != -1) { + close(poll_ev->signal_fd); + poll_ev->signal_fd = -1; + } + if (poll_ev->num_fds == 0) { + return 0; + } + if (poll_ev->fds[0].fd != -1) { + close(poll_ev->fds[0].fd); + poll_ev->fds[0].fd = -1; + } + return 0; +} + +static bool set_nonblock(int fd) +{ + int val; + + val = fcntl(fd, F_GETFL, 0); + if (val == -1) { + return false; + } + val |= O_NONBLOCK; + + return (fcntl(fd, F_SETFL, val) != -1); +} + +static int poll_event_context_init_mt(struct tevent_context *ev) +{ + struct poll_event_context *poll_ev; + struct pollfd *pfd; + int fds[2]; + int ret; + + ret = poll_event_context_init(ev); + if (ret == -1) { + return ret; + } + + poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + + poll_ev->fds = talloc_zero(poll_ev, struct pollfd); + if (poll_ev->fds == NULL) { + return -1; + } + + ret = pipe(fds); + if (ret == -1) { + return -1; + } + + if (!set_nonblock(fds[0]) || !set_nonblock(fds[1])) { + close(fds[0]); + close(fds[1]); + return -1; + } + + poll_ev->signal_fd = fds[1]; + + pfd = &poll_ev->fds[0]; + pfd->fd = fds[0]; + pfd->events = (POLLIN|POLLHUP); + + poll_ev->num_fds = 1; + + talloc_set_destructor(poll_ev, poll_event_mt_destructor); + + return 0; +} + +static void poll_event_wake_pollthread(struct poll_event_context *poll_ev) +{ + char c; + ssize_t ret; + + if (poll_ev->signal_fd == -1) { + return; + } + c = 0; + do { + ret = write(poll_ev->signal_fd, &c, sizeof(c)); + } while ((ret == -1) && (errno == EINTR)); +} + +static void poll_event_drain_signal_fd(struct poll_event_context *poll_ev) +{ + char buf[16]; + ssize_t ret; + int fd; + + if (poll_ev->signal_fd == -1) { + return; + } + + if (poll_ev->num_fds < 1) { + return; + } + fd = poll_ev->fds[0].fd; + + do { + ret = read(fd, buf, sizeof(buf)); + } while (ret == sizeof(buf)); +} + /* destroy an fd_event */ static int poll_event_fd_destructor(struct tevent_fd *fde) { struct tevent_context *ev = fde->event_ctx; - struct poll_event_context *poll_ev = NULL; - struct tevent_fd *moved_fde; + struct poll_event_context *poll_ev; uint64_t del_idx = fde->additional_flags; if (ev == NULL) { @@ -77,16 +195,35 @@ static int poll_event_fd_destructor(struct tevent_fd *fde) poll_ev = talloc_get_type_abort( ev->additional_data, struct poll_event_context); - moved_fde = poll_ev->fd_events[poll_ev->num_fds-1]; - poll_ev->fd_events[del_idx] = moved_fde; - poll_ev->fds[del_idx] = poll_ev->fds[poll_ev->num_fds-1]; - moved_fde->additional_flags = del_idx; - - poll_ev->num_fds -= 1; + poll_ev->fdes[del_idx] = NULL; + poll_event_wake_pollthread(poll_ev); done: return tevent_common_fd_destructor(fde); } +static int poll_fresh_fde_destructor(struct tevent_fd *fde) +{ + struct poll_event_context *poll_ev = talloc_get_type_abort( + fde->event_ctx->additional_data, struct poll_event_context); + DLIST_REMOVE(poll_ev->fresh, fde); + return 0; +} + +static void poll_event_schedule_immediate(struct tevent_immediate *im, + struct tevent_context *ev, + tevent_immediate_handler_t handler, + void *private_data, + const char *handler_name, + const char *location) +{ + struct poll_event_context *poll_ev = talloc_get_type_abort( + ev->additional_data, struct poll_event_context); + + tevent_common_schedule_immediate(im, ev, handler, private_data, + handler_name, location); + poll_event_wake_pollthread(poll_ev); +} + /* add a fd based event return NULL on failure (memory allocation error) @@ -101,60 +238,35 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev, { struct poll_event_context *poll_ev = talloc_get_type_abort( ev->additional_data, struct poll_event_context); - struct pollfd *pfd; struct tevent_fd *fde; - fde = tevent_common_add_fd(ev, mem_ctx, fd, flags, - handler, private_data, - handler_name, location); - if (fde == NULL) { + if (fd < 0) { return NULL; } - /* we allocate 16 slots to avoid a lot of reallocations */ - if (talloc_array_length(poll_ev->fds) == poll_ev->num_fds) { - struct pollfd *tmp_fds; - struct tevent_fd **tmp_fd_events; - tmp_fds = talloc_realloc( - poll_ev, poll_ev->fds, struct pollfd, - poll_ev->num_fds + 16); - if (tmp_fds == NULL) { - TALLOC_FREE(fde); - return NULL; - } - poll_ev->fds = tmp_fds; - - tmp_fd_events = talloc_realloc( - poll_ev, poll_ev->fd_events, struct tevent_fd *, - poll_ev->num_fds + 16); - if (tmp_fd_events == NULL) { - TALLOC_FREE(fde); - return NULL; - } - poll_ev->fd_events = tmp_fd_events; - } - - pfd = &poll_ev->fds[poll_ev->num_fds]; - - pfd->fd = fd; - - pfd->events = 0; - pfd->revents = 0; - - if (flags & TEVENT_FD_READ) { - pfd->events |= (POLLIN|POLLHUP); - } - if (flags & TEVENT_FD_WRITE) { - pfd->events |= (POLLOUT); + fde = talloc(mem_ctx ? mem_ctx : ev, struct tevent_fd); + if (fde == NULL) { + return NULL; } + fde->event_ctx = ev; + fde->fd = fd; + fde->flags = flags; + fde->handler = handler; + fde->close_fn = NULL; + fde->private_data = private_data; + fde->handler_name = handler_name; + fde->location = location; + fde->additional_flags = 0; + fde->additional_data = NULL; + + DLIST_ADD(poll_ev->fresh, fde); + talloc_set_destructor(fde, poll_fresh_fde_destructor); + poll_event_wake_pollthread(poll_ev); - fde->additional_flags = poll_ev->num_fds; - poll_ev->fd_events[poll_ev->num_fds] = fde; - - poll_ev->num_fds += 1; - - talloc_set_destructor(fde, poll_event_fd_destructor); - + /* + * poll_event_loop_poll will take care of the rest in + * poll_event_setup_fresh + */ return fde; } @@ -178,6 +290,82 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags) poll_ev->fds[idx].events = pollflags; fde->flags = flags; + poll_event_wake_pollthread(poll_ev); +} + +static bool poll_event_setup_fresh(struct tevent_context *ev, + struct poll_event_context *poll_ev) +{ + struct tevent_fd *fde, *next; + unsigned num_fresh, num_fds; + + if (poll_ev->fresh == NULL) { + return true; + } + + num_fresh = 0; + for (fde = poll_ev->fresh; fde; fde = fde->next) { + num_fresh += 1; -- Samba Shared Repository