The branch, master has been updated
       via  fe707f6 Add a new module, aio_linux which implements Linux kernel 
aio support. Docs to follow.
      from  224379b pidl/NDR/Parser: also do range checks on the array size

http://gitweb.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit fe707f6549292ccb681ccd0c596cbd17525522f3
Author: Jeremy Allison <j...@samba.org>
Date:   Tue Apr 10 15:45:55 2012 -0700

    Add a new module, aio_linux which implements Linux kernel aio support. Docs 
to follow.
    
    Autobuild-User: Jeremy Allison <j...@samba.org>
    Autobuild-Date: Wed Apr 11 02:29:04 CEST 2012 on sn-devel-104

-----------------------------------------------------------------------

Summary of changes:
 source3/Makefile.in                                |    5 +
 source3/configure.in                               |   15 +
 .../modules/{vfs_aio_pthread.c => vfs_aio_linux.c} |  516 ++++++++++++--------
 source3/modules/wscript_build                      |   10 +
 source3/wscript                                    |   26 +
 5 files changed, 365 insertions(+), 207 deletions(-)
 copy source3/modules/{vfs_aio_pthread.c => vfs_aio_linux.c} (53%)


Changeset truncated at 500 lines:

diff --git a/source3/Makefile.in b/source3/Makefile.in
index e1d8770..ff223d9 100644
--- a/source3/Makefile.in
+++ b/source3/Makefile.in
@@ -873,6 +873,7 @@ VFS_TSMSM_OBJ = modules/vfs_tsmsm.o
 VFS_FILEID_OBJ = modules/vfs_fileid.o
 VFS_AIO_FORK_OBJ = modules/vfs_aio_fork.o
 VFS_AIO_PTHREAD_OBJ = modules/vfs_aio_pthread.o
+VFS_AIO_LINUX_OBJ = modules/vfs_aio_linux.o
 VFS_PREOPEN_OBJ = modules/vfs_preopen.o
 VFS_SYNCOPS_OBJ = modules/vfs_syncops.o
 VFS_ACL_XATTR_OBJ = modules/vfs_acl_xattr.o
@@ -3066,6 +3067,10 @@ bin/aio_pthread.@SHLIBEXT@: $(BINARY_PREREQS) 
$(VFS_AIO_PTHREAD_OBJ)
        @echo "Building plugin $@"
        @$(SHLD_MODULE) $(VFS_AIO_PTHREAD_OBJ)
 
+bin/aio_linux.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_AIO_LINUX_OBJ)
+       @echo "Building plugin $@"
+       @$(SHLD_MODULE) $(VFS_AIO_LINUX_OBJ)
+
 bin/preopen.@SHLIBEXT@: $(BINARY_PREREQS) $(VFS_PREOPEN_OBJ)
        @echo "Building plugin $@"
        @$(SHLD_MODULE) $(VFS_PREOPEN_OBJ)
diff --git a/source3/configure.in b/source3/configure.in
index bf777a1..0470a18 100644
--- a/source3/configure.in
+++ b/source3/configure.in
@@ -5567,6 +5567,20 @@ if test x"$samba_cv_HAVE_AIO" = x"yes"; then
                x"$samba_cv_msghdr_msg_acctright" = x"yes"; then
                default_shared_modules="$default_shared_modules vfs_aio_fork"
        fi
+
+# Check for Linux kernel aio support.
+       case "$host_os" in
+       *linux*)
+           AC_MSG_CHECKING(for Linux kernel asynchronous io support)
+           AC_CHECK_LIB(aio,io_submit,
+               [AIO_LIBS="$LIBS -laio";
+               AC_DEFINE(HAVE_LINUX_KERNEL_AIO, 1, Define to 1 if there is 
support for Linux kernel asynchronous io)],
+               [])
+           if test x"$ac_cv_lib_aio_io_submit" = x"yes"; then
+               default_shared_modules="$default_shared_modules vfs_aio_linux"
+           fi
+            ;;
+        esac
 fi
 
 #################################################
@@ -6519,6 +6533,7 @@ SMB_MODULE(vfs_tsmsm, \$(VFS_TSMSM_OBJ), 
"bin/tsmsm.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_fileid, \$(VFS_FILEID_OBJ), "bin/fileid.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_aio_fork, \$(VFS_AIO_FORK_OBJ), "bin/aio_fork.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_aio_pthread, \$(VFS_AIO_PTHREAD_OBJ), 
"bin/aio_pthread.$SHLIBEXT", VFS)
+SMB_MODULE(vfs_aio_linux, \$(VFS_AIO_LINUX_OBJ), "bin/aio_linux.$SHLIBEXT", 
VFS)
 SMB_MODULE(vfs_preopen, \$(VFS_PREOPEN_OBJ), "bin/preopen.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_syncops, \$(VFS_SYNCOPS_OBJ), "bin/syncops.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_zfsacl, \$(VFS_ZFSACL_OBJ), "bin/zfsacl.$SHLIBEXT", VFS)
diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_linux.c
similarity index 53%
copy from source3/modules/vfs_aio_pthread.c
copy to source3/modules/vfs_aio_linux.c
index 1cddea3..f6fa80a 100644
--- a/source3/modules/vfs_aio_pthread.c
+++ b/source3/modules/vfs_aio_linux.c
@@ -1,9 +1,6 @@
 /*
- * Simulate Posix AIO using pthreads.
+ * Simulate Posix AIO using Linux kernel AIO.
  *
- * Based on the aio_fork work from Volker and Volker's pthreadpool library.
- *
- * Copyright (C) Volker Lendecke 2008
  * Copyright (C) Jeremy Allison 2012
  *
  * This program is free software; you can redistribute it and/or modify
@@ -23,111 +20,140 @@
 
 #include "includes.h"
 #include "system/filesys.h"
-#include "system/shmem.h"
 #include "smbd/smbd.h"
 #include "smbd/globals.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include <sys/eventfd.h>
+#include <libaio.h>
 
 struct aio_extra;
-static struct pthreadpool *pool;
-static int aio_pthread_jobid;
+static int event_fd = -1;
+static io_context_t io_ctx;
+static int aio_linux_requestid;
+static struct io_event *io_recv_events;
+static struct fd_event *aio_read_event;
 
 struct aio_private_data {
        struct aio_private_data *prev, *next;
-       int jobid;
+       int requestid;
        SMB_STRUCT_AIOCB *aiocb;
+       struct iocb *event_iocb;
        ssize_t ret_size;
        int ret_errno;
        bool cancelled;
-       bool write_command;
 };
 
 /* List of outstanding requests we have. */
 static struct aio_private_data *pd_list;
 
-static void aio_pthread_handle_completion(struct event_context *event_ctx,
-                               struct fd_event *event,
-                               uint16 flags,
-                               void *p);
+static void aio_linux_handle_completion(struct event_context *event_ctx,
+                       struct fd_event *event,
+                       uint16 flags,
+                       void *p);
+
+/************************************************************************
+ Housekeeping. Cleanup if no activity for 30 seconds.
+***********************************************************************/
+
+static void aio_linux_housekeeping(struct tevent_context *event_ctx,
+                                        struct tevent_timer *te,
+                                        struct timeval now,
+                                        void *private_data)
+{
+       /* Remove this timed event handler. */
+       TALLOC_FREE(te);
 
+       if (pd_list != NULL) {
+               /* Still busy. Look again in 30 seconds. */
+               (void)tevent_add_timer(event_ctx,
+                                       NULL,
+                                       timeval_current_ofs(30, 0),
+                                       aio_linux_housekeeping,
+                                       NULL);
+               return;
+       }
+
+       /* No activity for 30 seconds. Close out kernel resources. */
+       io_queue_release(io_ctx);
+       memset(&io_ctx, '\0', sizeof(io_ctx));
+
+       if (event_fd != -1) {
+               close(event_fd);
+               event_fd = -1;
+       }
+
+       TALLOC_FREE(aio_read_event);
+       TALLOC_FREE(io_recv_events);
+}
 
 /************************************************************************
- Ensure thread pool is initialized.
+ Ensure event fd and aio context are initialized.
 ***********************************************************************/
 
-static bool init_aio_threadpool(struct vfs_handle_struct *handle)
+static bool init_aio_linux(struct vfs_handle_struct *handle)
 {
-       struct fd_event *sock_event = NULL;
-       int ret = 0;
+       struct tevent_timer *te = NULL;
 
-       if (pool) {
+       if (event_fd != -1) {
+               /* Already initialized. */
                return true;
        }
 
-       ret = pthreadpool_init(aio_pending_size, &pool);
-       if (ret) {
-               errno = ret;
-               return false;
+       /* Shedule a shutdown event for 30 seconds from now. */
+       te = tevent_add_timer(server_event_context(),
+                               NULL,
+                               timeval_current_ofs(30, 0),
+                               aio_linux_housekeeping,
+                               NULL);
+
+       if (te == NULL) {
+               goto fail;
        }
-       sock_event = tevent_add_fd(server_event_context(),
+
+       /* Ensure we have enough space for aio_pending_size events. */
+       io_recv_events = talloc_zero_array(NULL,
+                               struct io_event,
+                               aio_pending_size);
+       if (io_recv_events == NULL) {
+               goto fail;
+       }
+
+       event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+       if (event_fd == -1) {
+               goto fail;
+       }
+
+       aio_read_event = tevent_add_fd(server_event_context(),
                                NULL,
-                               pthreadpool_signal_fd(pool),
+                               event_fd,
                                TEVENT_FD_READ,
-                               aio_pthread_handle_completion,
+                               aio_linux_handle_completion,
                                NULL);
-       if (sock_event == NULL) {
-               pthreadpool_destroy(pool);
-               pool = NULL;
-               return false;
+       if (aio_read_event == NULL) {
+               goto fail;
+       }
+
+       if (io_queue_init(aio_pending_size, &io_ctx)) {
+               goto fail;
        }
 
-       DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
+       DEBUG(10,("init_aio_linux: initialized with up to %d events\n",
                  aio_pending_size));
 
        return true;
-}
 
+  fail:
 
-/************************************************************************
- Worker function - core of the pthread aio engine.
- This is the function that actually does the IO.
-***********************************************************************/
+       DEBUG(10,("init_aio_linux: initialization failed\n"));
 
-static void aio_worker(void *private_data)
-{
-       struct aio_private_data *pd =
-                       (struct aio_private_data *)private_data;
-
-       if (pd->write_command) {
-               pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
-                               (const void *)pd->aiocb->aio_buf,
-                               pd->aiocb->aio_nbytes,
-                               pd->aiocb->aio_offset);
-               if (pd->ret_size == -1 && errno == ESPIPE) {
-                       /* Maintain the fiction that pipes can
-                          be seeked (sought?) on. */
-                       pd->ret_size = sys_write(pd->aiocb->aio_fildes,
-                                       (const void *)pd->aiocb->aio_buf,
-                                       pd->aiocb->aio_nbytes);
-               }
-       } else {
-               pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
-                               (void *)pd->aiocb->aio_buf,
-                               pd->aiocb->aio_nbytes,
-                               pd->aiocb->aio_offset);
-               if (pd->ret_size == -1 && errno == ESPIPE) {
-                       /* Maintain the fiction that pipes can
-                          be seeked (sought?) on. */
-                       pd->ret_size = sys_read(pd->aiocb->aio_fildes,
-                                       (void *)pd->aiocb->aio_buf,
-                                       pd->aiocb->aio_nbytes);
-               }
-       }
-       if (pd->ret_size == -1) {
-               pd->ret_errno = errno;
-       } else {
-               pd->ret_errno = 0;
-       }
+       TALLOC_FREE(te);
+       TALLOC_FREE(io_recv_events);
+       TALLOC_FREE(aio_read_event);
+       if (event_fd != -1) {
+               close(event_fd);
+               event_fd = -1;
+       }
+       memset(&io_ctx, '\0', sizeof(io_ctx));
+       return false;
 }
 
 /************************************************************************
@@ -151,7 +177,8 @@ static struct aio_private_data 
*create_private_data(TALLOC_CTX *ctx,
        if (!pd) {
                return NULL;
        }
-       pd->jobid = aio_pthread_jobid++;
+       pd->event_iocb = talloc_zero(pd, struct iocb);
+       pd->requestid = aio_linux_requestid++;
        pd->aiocb = aiocb;
        pd->ret_size = -1;
        pd->ret_errno = EINPROGRESS;
@@ -161,10 +188,10 @@ static struct aio_private_data 
*create_private_data(TALLOC_CTX *ctx,
 }
 
 /************************************************************************
- Spin off a threadpool (if needed) and initiate a pread call.
+ Initiate an asynchronous pread call.
 ***********************************************************************/
 
-static int aio_pthread_read(struct vfs_handle_struct *handle,
+static int aio_linux_read(struct vfs_handle_struct *handle,
                                struct files_struct *fsp,
                                SMB_STRUCT_AIOCB *aiocb)
 {
@@ -172,25 +199,34 @@ static int aio_pthread_read(struct vfs_handle_struct 
*handle,
        struct aio_private_data *pd = NULL;
        int ret;
 
-       if (!init_aio_threadpool(handle)) {
+       if (!init_aio_linux(handle)) {
                return -1;
        }
 
        pd = create_private_data(aio_ex, aiocb);
        if (pd == NULL) {
-               DEBUG(10, ("aio_pthread_read: Could not create private 
data.\n"));
+               DEBUG(10, ("aio_linux_read: Could not create private data.\n"));
                return -1;
        }
 
-       ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
-       if (ret) {
+       io_prep_pread(pd->event_iocb,
+                       pd->aiocb->aio_fildes,
+                       discard_const(pd->aiocb->aio_buf),
+                       pd->aiocb->aio_nbytes,
+                       pd->aiocb->aio_offset);
+       io_set_eventfd(pd->event_iocb, event_fd);
+       /* Use the callback pointer as a private data ptr. */
+       io_set_callback(pd->event_iocb, (io_callback_t)pd);
+
+       ret = io_submit(io_ctx, 1, &pd->event_iocb);
+       if (ret < 0) {
                errno = ret;
                return -1;
        }
 
-       DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
+       DEBUG(10, ("aio_linux_read: requestid=%d read requested "
                "of %llu bytes at offset %llu\n",
-               pd->jobid,
+               pd->requestid,
                (unsigned long long)pd->aiocb->aio_nbytes,
                (unsigned long long)pd->aiocb->aio_offset));
 
@@ -198,10 +234,10 @@ static int aio_pthread_read(struct vfs_handle_struct 
*handle,
 }
 
 /************************************************************************
- Spin off a threadpool (if needed) and initiate a pwrite call.
+ Initiate an asynchronous pwrite call.
 ***********************************************************************/
 
-static int aio_pthread_write(struct vfs_handle_struct *handle,
+static int aio_linux_write(struct vfs_handle_struct *handle,
                                struct files_struct *fsp,
                                SMB_STRUCT_AIOCB *aiocb)
 {
@@ -209,27 +245,34 @@ static int aio_pthread_write(struct vfs_handle_struct 
*handle,
        struct aio_private_data *pd = NULL;
        int ret;
 
-       if (!init_aio_threadpool(handle)) {
+       if (!init_aio_linux(handle)) {
                return -1;
        }
 
        pd = create_private_data(aio_ex, aiocb);
        if (pd == NULL) {
-               DEBUG(10, ("aio_pthread_write: Could not create private 
data.\n"));
+               DEBUG(10, ("aio_linux_write: Could not create private 
data.\n"));
                return -1;
        }
 
-       pd->write_command = true;
+       io_prep_pwrite(pd->event_iocb,
+                       pd->aiocb->aio_fildes,
+                       discard_const(pd->aiocb->aio_buf),
+                       pd->aiocb->aio_nbytes,
+                       pd->aiocb->aio_offset);
+       io_set_eventfd(pd->event_iocb, event_fd);
+       /* Use the callback pointer as a private data ptr. */
+       io_set_callback(pd->event_iocb, (io_callback_t)pd);
 
-       ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
-       if (ret) {
+       ret = io_submit(io_ctx, 1, &pd->event_iocb);
+       if (ret < 0) {
                errno = ret;
                return -1;
        }
 
-       DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
+       DEBUG(10, ("aio_linux_write: requestid=%d pwrite requested "
                "of %llu bytes at offset %llu\n",
-               pd->jobid,
+               pd->requestid,
                (unsigned long long)pd->aiocb->aio_nbytes,
                (unsigned long long)pd->aiocb->aio_offset));
 
@@ -237,62 +280,92 @@ static int aio_pthread_write(struct vfs_handle_struct 
*handle,
 }
 
 /************************************************************************
- Find the private data by jobid.
+ Handle a single finished io.
 ***********************************************************************/
 
-static struct aio_private_data *find_private_data_by_jobid(int jobid)
+static void aio_linux_handle_io_finished(struct io_event *ioev)
 {
-       struct aio_private_data *pd;
+       struct aio_extra *aio_ex = NULL;
+       struct aio_private_data *pd = (struct aio_private_data *)ioev->data;
 
-       for (pd = pd_list; pd != NULL; pd = pd->next) {
-               if (pd->jobid == jobid) {
-                       return pd;
-               }
+       /* ioev->res2 contains the -errno if error. */
+       /* ioev->res contains the number of bytes sent/received. */
+       if (ioev->res2) {
+               pd->ret_size = -1;
+               pd->ret_errno = -ioev->res2;
+       } else {
+               pd->ret_size = ioev->res;
+               pd->ret_errno = 0;
        }
 
-       return NULL;
+       aio_ex = (struct aio_extra 
*)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
+       smbd_aio_complete_aio_ex(aio_ex);
+
+       DEBUG(10,("aio_linux_handle_io_finished: requestid %d completed\n",
+               pd->requestid ));
+       TALLOC_FREE(aio_ex);
 }
 
 /************************************************************************
- Callback when an IO completes.
+ Callback when multiple IOs complete.
 ***********************************************************************/
 
-static void aio_pthread_handle_completion(struct event_context *event_ctx,
+static void aio_linux_handle_completion(struct event_context *event_ctx,
                                struct fd_event *event,
                                uint16 flags,
                                void *p)
 {
-       struct aio_extra *aio_ex = NULL;
-       struct aio_private_data *pd = NULL;
-       int jobid = 0;
-       int ret;
+       uint64_t num_events = 0;
 
-       DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
+       DEBUG(10, ("aio_linux_handle_completion called with flags=%d\n",
                        (int)flags));
 
        if ((flags & EVENT_FD_READ) == 0) {
                return;
        }
 
-       ret = pthreadpool_finished_job(pool, &jobid);
-       if (ret) {
-               smb_panic("aio_pthread_handle_completion");
-               return;
+       /* Read the number of events available. */
+       if (sys_read(event_fd, &num_events, sizeof(num_events)) !=
+                       sizeof(num_events)) {
+               smb_panic("aio_linux_handle_completion: invalid read");
        }
 
-       pd = find_private_data_by_jobid(jobid);
-       if (pd == NULL) {
-               DEBUG(1, ("aio_pthread_handle_completion cannot find jobid 
%d\n",
-                         jobid));
-               return;
-       }
+       while (num_events > 0) {
+               uint64_t events_to_read = MIN(num_events, aio_pending_size);
+               struct timespec ts;
+               int i;
+               int ret;
 
-       aio_ex = (struct aio_extra 
*)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
-       smbd_aio_complete_aio_ex(aio_ex);
+               ts.tv_sec = 0;
+               ts.tv_nsec = 0;
 
-       DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
-               jobid ));
-       TALLOC_FREE(aio_ex);
+               ret = io_getevents(io_ctx,
+                       1,
+                       (long)events_to_read,
+                       io_recv_events,
+                       &ts);
+


-- 
Samba Shared Repository

Reply via email to