The branch, master has been updated via fe707f6 Add a new module, aio_linux which implements Linux kernel aio support. Docs to follow. from 224379b pidl/NDR/Parser: also do range checks on the array size
http://gitweb.samba.org/?p=samba.git;a=shortlog;h=master - Log ----------------------------------------------------------------- commit fe707f6549292ccb681ccd0c596cbd17525522f3 Author: Jeremy Allison <j...@samba.org> Date: Tue Apr 10 15:45:55 2012 -0700 Add a new module, aio_linux which implements Linux kernel aio support. Docs to follow. Autobuild-User: Jeremy Allison <j...@samba.org> Autobuild-Date: Wed Apr 11 02:29:04 CEST 2012 on sn-devel-104 ----------------------------------------------------------------------- Summary of changes: source3/Makefile.in | 5 + source3/configure.in | 15 + .../modules/{vfs_aio_pthread.c => vfs_aio_linux.c} | 516 ++++++++++++-------- source3/modules/wscript_build | 10 + source3/wscript | 26 + 5 files changed, 365 insertions(+), 207 deletions(-) copy source3/modules/{vfs_aio_pthread.c => vfs_aio_linux.c} (53%) Changeset truncated at 500 lines: diff --git a/source3/Makefile.in b/source3/Makefile.in index e1d8770..ff223d9 100644 --- a/source3/Makefile.in +++ b/source3/Makefile.in @@ -873,6 +873,7 @@ VFS_TSMSM_OBJ = modules/vfs_tsmsm.o VFS_FILEID_OBJ = modules/vfs_fileid.o VFS_AIO_FORK_OBJ = modules/vfs_aio_fork.o VFS_AIO_PTHREAD_OBJ = modules/vfs_aio_pthread.o +VFS_AIO_LINUX_OBJ = modules/vfs_aio_linux.o VFS_PREOPEN_OBJ = modules/vfs_preopen.o VFS_SYNCOPS_OBJ = modules/vfs_syncops.o VFS_ACL_XATTR_OBJ = modules/vfs_acl_xattr.o @@ -3066,6 +3067,10 @@ bin/aio_pthread.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_AIO_PTHREAD_OBJ) @echo "Building plugin $@" @$(SHLD_MODULE) $(VFS_AIO_PTHREAD_OBJ) +bin/aio_linux.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_AIO_LINUX_OBJ) + @echo "Building plugin $@" + @$(SHLD_MODULE) $(VFS_AIO_LINUX_OBJ) + bin/preopen.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_PREOPEN_OBJ) @echo "Building plugin $@" @$(SHLD_MODULE) $(VFS_PREOPEN_OBJ) diff --git a/source3/configure.in b/source3/configure.in index bf777a1..0470a18 100644 --- a/source3/configure.in +++ b/source3/configure.in @@ -5567,6 +5567,20 @@ if test x"$samba_cv_HAVE_AIO" = x"yes"; then x"$samba_cv_msghdr_msg_acctright" = x"yes"; then default_shared_modules="$default_shared_modules vfs_aio_fork" fi + +# Check for Linux kernel aio support. + case "$host_os" in + *linux*) + AC_MSG_CHECKING(for Linux kernel asynchronous io support) + AC_CHECK_LIB(aio,io_submit, + [AIO_LIBS="$LIBS -laio"; + AC_DEFINE(HAVE_LINUX_KERNEL_AIO, 1, Define to 1 if there is support for Linux kernel asynchronous io)], + []) + if test x"$ac_cv_lib_aio_io_submit" = x"yes"; then + default_shared_modules="$default_shared_modules vfs_aio_linux" + fi + ;; + esac fi ################################################# @@ -6519,6 +6533,7 @@ SMB_MODULE(vfs_tsmsm, \$(VFS_TSMSM_OBJ), "bin/tsmsm.$SHLIBEXT", VFS) SMB_MODULE(vfs_fileid, \$(VFS_FILEID_OBJ), "bin/fileid.$SHLIBEXT", VFS) SMB_MODULE(vfs_aio_fork, \$(VFS_AIO_FORK_OBJ), "bin/aio_fork.$SHLIBEXT", VFS) SMB_MODULE(vfs_aio_pthread, \$(VFS_AIO_PTHREAD_OBJ), "bin/aio_pthread.$SHLIBEXT", VFS) +SMB_MODULE(vfs_aio_linux, \$(VFS_AIO_LINUX_OBJ), "bin/aio_linux.$SHLIBEXT", VFS) SMB_MODULE(vfs_preopen, \$(VFS_PREOPEN_OBJ), "bin/preopen.$SHLIBEXT", VFS) SMB_MODULE(vfs_syncops, \$(VFS_SYNCOPS_OBJ), "bin/syncops.$SHLIBEXT", VFS) SMB_MODULE(vfs_zfsacl, \$(VFS_ZFSACL_OBJ), "bin/zfsacl.$SHLIBEXT", VFS) diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_linux.c similarity index 53% copy from source3/modules/vfs_aio_pthread.c copy to source3/modules/vfs_aio_linux.c index 1cddea3..f6fa80a 100644 --- a/source3/modules/vfs_aio_pthread.c +++ b/source3/modules/vfs_aio_linux.c @@ -1,9 +1,6 @@ /* - * Simulate Posix AIO using pthreads. + * Simulate Posix AIO using Linux kernel AIO. * - * Based on the aio_fork work from Volker and Volker's pthreadpool library. - * - * Copyright (C) Volker Lendecke 2008 * Copyright (C) Jeremy Allison 2012 * * This program is free software; you can redistribute it and/or modify @@ -23,111 +20,140 @@ #include "includes.h" #include "system/filesys.h" -#include "system/shmem.h" #include "smbd/smbd.h" #include "smbd/globals.h" -#include "lib/pthreadpool/pthreadpool.h" +#include <sys/eventfd.h> +#include <libaio.h> struct aio_extra; -static struct pthreadpool *pool; -static int aio_pthread_jobid; +static int event_fd = -1; +static io_context_t io_ctx; +static int aio_linux_requestid; +static struct io_event *io_recv_events; +static struct fd_event *aio_read_event; struct aio_private_data { struct aio_private_data *prev, *next; - int jobid; + int requestid; SMB_STRUCT_AIOCB *aiocb; + struct iocb *event_iocb; ssize_t ret_size; int ret_errno; bool cancelled; - bool write_command; }; /* List of outstanding requests we have. */ static struct aio_private_data *pd_list; -static void aio_pthread_handle_completion(struct event_context *event_ctx, - struct fd_event *event, - uint16 flags, - void *p); +static void aio_linux_handle_completion(struct event_context *event_ctx, + struct fd_event *event, + uint16 flags, + void *p); + +/************************************************************************ + Housekeeping. Cleanup if no activity for 30 seconds. +***********************************************************************/ + +static void aio_linux_housekeeping(struct tevent_context *event_ctx, + struct tevent_timer *te, + struct timeval now, + void *private_data) +{ + /* Remove this timed event handler. */ + TALLOC_FREE(te); + if (pd_list != NULL) { + /* Still busy. Look again in 30 seconds. */ + (void)tevent_add_timer(event_ctx, + NULL, + timeval_current_ofs(30, 0), + aio_linux_housekeeping, + NULL); + return; + } + + /* No activity for 30 seconds. Close out kernel resources. */ + io_queue_release(io_ctx); + memset(&io_ctx, '\0', sizeof(io_ctx)); + + if (event_fd != -1) { + close(event_fd); + event_fd = -1; + } + + TALLOC_FREE(aio_read_event); + TALLOC_FREE(io_recv_events); +} /************************************************************************ - Ensure thread pool is initialized. + Ensure event fd and aio context are initialized. ***********************************************************************/ -static bool init_aio_threadpool(struct vfs_handle_struct *handle) +static bool init_aio_linux(struct vfs_handle_struct *handle) { - struct fd_event *sock_event = NULL; - int ret = 0; + struct tevent_timer *te = NULL; - if (pool) { + if (event_fd != -1) { + /* Already initialized. */ return true; } - ret = pthreadpool_init(aio_pending_size, &pool); - if (ret) { - errno = ret; - return false; + /* Shedule a shutdown event for 30 seconds from now. */ + te = tevent_add_timer(server_event_context(), + NULL, + timeval_current_ofs(30, 0), + aio_linux_housekeeping, + NULL); + + if (te == NULL) { + goto fail; } - sock_event = tevent_add_fd(server_event_context(), + + /* Ensure we have enough space for aio_pending_size events. */ + io_recv_events = talloc_zero_array(NULL, + struct io_event, + aio_pending_size); + if (io_recv_events == NULL) { + goto fail; + } + + event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + if (event_fd == -1) { + goto fail; + } + + aio_read_event = tevent_add_fd(server_event_context(), NULL, - pthreadpool_signal_fd(pool), + event_fd, TEVENT_FD_READ, - aio_pthread_handle_completion, + aio_linux_handle_completion, NULL); - if (sock_event == NULL) { - pthreadpool_destroy(pool); - pool = NULL; - return false; + if (aio_read_event == NULL) { + goto fail; + } + + if (io_queue_init(aio_pending_size, &io_ctx)) { + goto fail; } - DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n", + DEBUG(10,("init_aio_linux: initialized with up to %d events\n", aio_pending_size)); return true; -} + fail: -/************************************************************************ - Worker function - core of the pthread aio engine. - This is the function that actually does the IO. -***********************************************************************/ + DEBUG(10,("init_aio_linux: initialization failed\n")); -static void aio_worker(void *private_data) -{ - struct aio_private_data *pd = - (struct aio_private_data *)private_data; - - if (pd->write_command) { - pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes, - (const void *)pd->aiocb->aio_buf, - pd->aiocb->aio_nbytes, - pd->aiocb->aio_offset); - if (pd->ret_size == -1 && errno == ESPIPE) { - /* Maintain the fiction that pipes can - be seeked (sought?) on. */ - pd->ret_size = sys_write(pd->aiocb->aio_fildes, - (const void *)pd->aiocb->aio_buf, - pd->aiocb->aio_nbytes); - } - } else { - pd->ret_size = sys_pread(pd->aiocb->aio_fildes, - (void *)pd->aiocb->aio_buf, - pd->aiocb->aio_nbytes, - pd->aiocb->aio_offset); - if (pd->ret_size == -1 && errno == ESPIPE) { - /* Maintain the fiction that pipes can - be seeked (sought?) on. */ - pd->ret_size = sys_read(pd->aiocb->aio_fildes, - (void *)pd->aiocb->aio_buf, - pd->aiocb->aio_nbytes); - } - } - if (pd->ret_size == -1) { - pd->ret_errno = errno; - } else { - pd->ret_errno = 0; - } + TALLOC_FREE(te); + TALLOC_FREE(io_recv_events); + TALLOC_FREE(aio_read_event); + if (event_fd != -1) { + close(event_fd); + event_fd = -1; + } + memset(&io_ctx, '\0', sizeof(io_ctx)); + return false; } /************************************************************************ @@ -151,7 +177,8 @@ static struct aio_private_data *create_private_data(TALLOC_CTX *ctx, if (!pd) { return NULL; } - pd->jobid = aio_pthread_jobid++; + pd->event_iocb = talloc_zero(pd, struct iocb); + pd->requestid = aio_linux_requestid++; pd->aiocb = aiocb; pd->ret_size = -1; pd->ret_errno = EINPROGRESS; @@ -161,10 +188,10 @@ static struct aio_private_data *create_private_data(TALLOC_CTX *ctx, } /************************************************************************ - Spin off a threadpool (if needed) and initiate a pread call. + Initiate an asynchronous pread call. ***********************************************************************/ -static int aio_pthread_read(struct vfs_handle_struct *handle, +static int aio_linux_read(struct vfs_handle_struct *handle, struct files_struct *fsp, SMB_STRUCT_AIOCB *aiocb) { @@ -172,25 +199,34 @@ static int aio_pthread_read(struct vfs_handle_struct *handle, struct aio_private_data *pd = NULL; int ret; - if (!init_aio_threadpool(handle)) { + if (!init_aio_linux(handle)) { return -1; } pd = create_private_data(aio_ex, aiocb); if (pd == NULL) { - DEBUG(10, ("aio_pthread_read: Could not create private data.\n")); + DEBUG(10, ("aio_linux_read: Could not create private data.\n")); return -1; } - ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd); - if (ret) { + io_prep_pread(pd->event_iocb, + pd->aiocb->aio_fildes, + discard_const(pd->aiocb->aio_buf), + pd->aiocb->aio_nbytes, + pd->aiocb->aio_offset); + io_set_eventfd(pd->event_iocb, event_fd); + /* Use the callback pointer as a private data ptr. */ + io_set_callback(pd->event_iocb, (io_callback_t)pd); + + ret = io_submit(io_ctx, 1, &pd->event_iocb); + if (ret < 0) { errno = ret; return -1; } - DEBUG(10, ("aio_pthread_read: jobid=%d pread requested " + DEBUG(10, ("aio_linux_read: requestid=%d read requested " "of %llu bytes at offset %llu\n", - pd->jobid, + pd->requestid, (unsigned long long)pd->aiocb->aio_nbytes, (unsigned long long)pd->aiocb->aio_offset)); @@ -198,10 +234,10 @@ static int aio_pthread_read(struct vfs_handle_struct *handle, } /************************************************************************ - Spin off a threadpool (if needed) and initiate a pwrite call. + Initiate an asynchronous pwrite call. ***********************************************************************/ -static int aio_pthread_write(struct vfs_handle_struct *handle, +static int aio_linux_write(struct vfs_handle_struct *handle, struct files_struct *fsp, SMB_STRUCT_AIOCB *aiocb) { @@ -209,27 +245,34 @@ static int aio_pthread_write(struct vfs_handle_struct *handle, struct aio_private_data *pd = NULL; int ret; - if (!init_aio_threadpool(handle)) { + if (!init_aio_linux(handle)) { return -1; } pd = create_private_data(aio_ex, aiocb); if (pd == NULL) { - DEBUG(10, ("aio_pthread_write: Could not create private data.\n")); + DEBUG(10, ("aio_linux_write: Could not create private data.\n")); return -1; } - pd->write_command = true; + io_prep_pwrite(pd->event_iocb, + pd->aiocb->aio_fildes, + discard_const(pd->aiocb->aio_buf), + pd->aiocb->aio_nbytes, + pd->aiocb->aio_offset); + io_set_eventfd(pd->event_iocb, event_fd); + /* Use the callback pointer as a private data ptr. */ + io_set_callback(pd->event_iocb, (io_callback_t)pd); - ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd); - if (ret) { + ret = io_submit(io_ctx, 1, &pd->event_iocb); + if (ret < 0) { errno = ret; return -1; } - DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested " + DEBUG(10, ("aio_linux_write: requestid=%d pwrite requested " "of %llu bytes at offset %llu\n", - pd->jobid, + pd->requestid, (unsigned long long)pd->aiocb->aio_nbytes, (unsigned long long)pd->aiocb->aio_offset)); @@ -237,62 +280,92 @@ static int aio_pthread_write(struct vfs_handle_struct *handle, } /************************************************************************ - Find the private data by jobid. + Handle a single finished io. ***********************************************************************/ -static struct aio_private_data *find_private_data_by_jobid(int jobid) +static void aio_linux_handle_io_finished(struct io_event *ioev) { - struct aio_private_data *pd; + struct aio_extra *aio_ex = NULL; + struct aio_private_data *pd = (struct aio_private_data *)ioev->data; - for (pd = pd_list; pd != NULL; pd = pd->next) { - if (pd->jobid == jobid) { - return pd; - } + /* ioev->res2 contains the -errno if error. */ + /* ioev->res contains the number of bytes sent/received. */ + if (ioev->res2) { + pd->ret_size = -1; + pd->ret_errno = -ioev->res2; + } else { + pd->ret_size = ioev->res; + pd->ret_errno = 0; } - return NULL; + aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr; + smbd_aio_complete_aio_ex(aio_ex); + + DEBUG(10,("aio_linux_handle_io_finished: requestid %d completed\n", + pd->requestid )); + TALLOC_FREE(aio_ex); } /************************************************************************ - Callback when an IO completes. + Callback when multiple IOs complete. ***********************************************************************/ -static void aio_pthread_handle_completion(struct event_context *event_ctx, +static void aio_linux_handle_completion(struct event_context *event_ctx, struct fd_event *event, uint16 flags, void *p) { - struct aio_extra *aio_ex = NULL; - struct aio_private_data *pd = NULL; - int jobid = 0; - int ret; + uint64_t num_events = 0; - DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n", + DEBUG(10, ("aio_linux_handle_completion called with flags=%d\n", (int)flags)); if ((flags & EVENT_FD_READ) == 0) { return; } - ret = pthreadpool_finished_job(pool, &jobid); - if (ret) { - smb_panic("aio_pthread_handle_completion"); - return; + /* Read the number of events available. */ + if (sys_read(event_fd, &num_events, sizeof(num_events)) != + sizeof(num_events)) { + smb_panic("aio_linux_handle_completion: invalid read"); } - pd = find_private_data_by_jobid(jobid); - if (pd == NULL) { - DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n", - jobid)); - return; - } + while (num_events > 0) { + uint64_t events_to_read = MIN(num_events, aio_pending_size); + struct timespec ts; + int i; + int ret; - aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr; - smbd_aio_complete_aio_ex(aio_ex); + ts.tv_sec = 0; + ts.tv_nsec = 0; - DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n", - jobid )); - TALLOC_FREE(aio_ex); + ret = io_getevents(io_ctx, + 1, + (long)events_to_read, + io_recv_events, + &ts); + -- Samba Shared Repository