Module Name: src Committed By: jym Date: Thu Nov 24 01:47:18 UTC 2011
Modified Files: src/sys/arch/xen/xen: xbdback_xenbus.c Log Message: Deep rework of the xbdback(4) driver; it now uses a thread per instance instead of continuations directly from shm callbacks or interrupt handlers. The whole CPS design remains but is adapted to cope with a thread model. This patch allows scheduling away I/O requests of domains that behave abnormally, or even destroy them if there is a need to (without thrashing dom0 with lots of error messages at IPL_BIO). I took this opportunity to make the driver MPSAFE, so multiple instances can run concurrently. Moved from home-grown pool(9) queues to pool_cache(9), and rework the callback mechanism so that it delegates I/O processing to thread instead of handling it itself through the continuation trampoline. This one fixes the potential DoS many have seen in a dom0 when trying to suspend a NetBSD domU with a corrupted I/O ring. Benchmarks (build.sh release runs and bonnie++) do not show any performance regression, the "new" driver is on-par with the "old" one. ok bouyer@. To generate a diff of this commit: cvs rdiff -u -r1.51 -r1.52 src/sys/arch/xen/xen/xbdback_xenbus.c Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/sys/arch/xen/xen/xbdback_xenbus.c diff -u src/sys/arch/xen/xen/xbdback_xenbus.c:1.51 src/sys/arch/xen/xen/xbdback_xenbus.c:1.52 --- src/sys/arch/xen/xen/xbdback_xenbus.c:1.51 Mon Nov 14 21:34:50 2011 +++ src/sys/arch/xen/xen/xbdback_xenbus.c Thu Nov 24 01:47:18 2011 @@ -1,4 +1,4 @@ -/* $NetBSD: xbdback_xenbus.c,v 1.51 2011/11/14 21:34:50 christos Exp $ */ +/* $NetBSD: xbdback_xenbus.c,v 1.52 2011/11/24 01:47:18 jym Exp $ */ /* * Copyright (c) 2006 Manuel Bouyer. @@ -26,23 +26,27 @@ */ #include <sys/cdefs.h> -__KERNEL_RCSID(0, "$NetBSD: xbdback_xenbus.c,v 1.51 2011/11/14 21:34:50 christos Exp $"); +__KERNEL_RCSID(0, "$NetBSD: xbdback_xenbus.c,v 1.52 2011/11/24 01:47:18 jym Exp $"); -#include <sys/types.h> -#include <sys/param.h> -#include <sys/systm.h> -#include <sys/malloc.h> -#include <sys/queue.h> -#include <sys/kernel.h> #include <sys/atomic.h> +#include <sys/buf.h> +#include <sys/condvar.h> #include <sys/conf.h> #include <sys/disk.h> #include <sys/device.h> #include <sys/fcntl.h> -#include <sys/vnode.h> #include <sys/kauth.h> -#include <sys/workqueue.h> -#include <sys/buf.h> +#include <sys/kernel.h> +#include <sys/kmem.h> +#include <sys/kthread.h> +#include <sys/malloc.h> +#include <sys/mutex.h> +#include <sys/param.h> +#include <sys/queue.h> +#include <sys/systm.h> +#include <sys/time.h> +#include <sys/types.h> +#include <sys/vnode.h> #include <xen/xen.h> #include <xen/xen_shm.h> @@ -75,39 +79,54 @@ struct xbdback_io; struct xbdback_fragment; struct xbdback_instance; -/* state of a xbdback instance */ -typedef enum {CONNECTED, DISCONNECTING, DISCONNECTED} xbdback_state_t; +/* + * status of a xbdback instance: + * WAITING: xbdback instance is connected, waiting for requests + * RUN: xbdi thread must be woken up, I/Os have to be processed + * DISCONNECTING: the instance is closing, no more I/Os can be scheduled + * DISCONNECTED: no I/Os, no ring, the thread should terminate. + */ +typedef enum {WAITING, RUN, DISCONNECTING, DISCONNECTED} xbdback_state_t; /* - * Since there are a variety of conditions that can block our I/O - * processing, which isn't allowed to suspend its thread's execution, - * such things will be done in a sort of continuation-passing style. - * - * Return value is NULL to indicate that execution has blocked; if - * it's finished, set xbdi->xbdi_cont (see below) to NULL and the return - * doesn't matter. Otherwise it's passed as the second parameter to - * the new value of xbdi->xbdi_cont. + * Each xbdback instance is managed by a single thread that handles all + * the I/O processing. As there are a variety of conditions that can block, + * everything will be done in a sort of continuation-passing style. + * + * When the execution has to block to delay processing, for example to + * allow system to recover because of memory shortage (via shared memory + * callback), the return value of a continuation can be set to NULL. In that + * case, the thread will go back to sleeping and wait for the proper + * condition before it starts processing requests again from where it left. + * Continuation state is "stored" in the xbdback instance (xbdi_cont and + * xbdi_cont_aux), and should only be manipulated by the instance thread. * + * As xbdback(4) has to handle different sort of asynchronous events (Xen + * event channels, biointr() soft interrupts, xenbus commands), the xbdi_lock + * mutex is used to protect specific elements of the xbdback instance from + * concurrent access: thread status and ring access (when pushing responses). + * * Here's how the call graph is supposed to be for a single I/O: + * * xbdback_co_main() * | - * | --> xbdback_co_cache_doflush() or NULL - * | | - * | -- xbdback_co_cache_flush2() <- xbdback_co_flush_done() <-- - * | | | - * | |-> xbdback_co_cache_flush() -> xbdback_co_flush() -- + * | --> xbdback_co_cache_doflush() or NULL + * | | + * | - xbdback_co_cache_flush2() <- xbdback_co_do_io() <- + * | | | + * | |-> xbdback_co_cache_flush() -> xbdback_co_map_io()- * xbdback_co_main_loop()-| - * | |-> xbdback_co_main_done() -> xbdback_co_flush() -- - * | | | - * | -- xbdback_co_main_done2() <- xbdback_co_flush_done() <-- - * | | - * | --> xbdback_co_main() or NULL + * | |-> xbdback_co_main_done() ---> xbdback_co_map_io()- + * | | | + * | -- xbdback_co_main_done2() <-- xbdback_co_do_io() <- + * | | + * | --> xbdback_co_main() or NULL * | * xbdback_co_io() -> xbdback_co_main_incr() -> xbdback_co_main_loop() * | - * xbdback_co_io_gotreq()--+---------> xbdback_co_flush() -- - * | | | - * -> xbdback_co_io_loop()----| <- xbdback_co_flush_done() <-- + * xbdback_co_io_gotreq()--+--> xbdback_co_map_io() --- + * | | | + * -> xbdback_co_io_loop()----| <- xbdback_co_do_io() <-- * | | | | * | | | |----------> xbdback_co_io_gotio() * | | | | @@ -131,21 +150,24 @@ enum xbdi_proto { XBDIP_64 }; - /* we keep the xbdback instances in a linked list */ struct xbdback_instance { SLIST_ENTRY(xbdback_instance) next; struct xenbus_device *xbdi_xbusd; /* our xenstore entry */ struct xenbus_watch xbdi_watch; /* to watch our store */ - domid_t xbdi_domid; /* attached to this domain */ + domid_t xbdi_domid; /* attached to this domain */ uint32_t xbdi_handle; /* domain-specific handle */ - xbdback_state_t xbdi_status; + char xbdi_name[16]; /* name of this instance */ + /* mutex that protects concurrent access to the xbdback instance */ + kmutex_t xbdi_lock; + kcondvar_t xbdi_cv; /* wait channel for thread work */ + xbdback_state_t xbdi_status; /* thread's status */ /* backing device parameters */ dev_t xbdi_dev; const struct bdevsw *xbdi_bdevsw; /* pointer to the device's bdevsw */ struct vnode *xbdi_vp; uint64_t xbdi_size; - int xbdi_ro; /* is device read-only ? */ + bool xbdi_ro; /* is device read-only ? */ /* parameters for the communication */ unsigned int xbdi_evtchn; /* private parameters for communication */ @@ -176,6 +198,11 @@ struct xbdback_instance { /* other state */ int xbdi_same_page; /* are we merging two segments on the same page? */ uint xbdi_pendingreqs; /* number of I/O in fly */ + int xbdi_errps; /* errors per second */ + struct timeval xbdi_lasterr_time; /* error time tracking */ +#ifdef DEBUG + struct timeval xbdi_lastfragio_time; /* fragmented I/O tracking */ +#endif }; /* Manipulation of the above reference count. */ #define xbdi_get(xbdip) atomic_inc_uint(&(xbdip)->xbdi_refcnt) @@ -208,7 +235,6 @@ struct xbdback_request { * can be coalesced. */ struct xbdback_io { - struct work xio_work; /* The instance pointer is duplicated for convenience. */ struct xbdback_instance *xio_xbdi; /* our xbd instance */ uint8_t xio_operation; @@ -252,18 +278,21 @@ struct xbdback_fragment { }; /* - * Wrap our pools with a chain of xbdback_instances whose I/O - * processing has blocked for want of memory from that pool. + * Pools to manage the chain of block requests and I/Os fragments + * submitted by frontend. */ struct xbdback_pool { - struct pool p; - SIMPLEQ_HEAD(xbdback_iqueue, xbdback_instance) q; + struct pool_cache pc; struct timeval last_warning; } xbdback_request_pool, xbdback_io_pool, xbdback_fragment_pool; + +SIMPLEQ_HEAD(xbdback_iqueue, xbdback_instance); static struct xbdback_iqueue xbdback_shmq; static int xbdback_shmcb; /* have we already registered a callback? */ -struct timeval xbdback_poolsleep_intvl = { 5, 0 }; +/* Interval between reports of I/O errors from frontend */ +struct timeval xbdback_err_intvl = { 1, 0 }; + #ifdef DEBUG struct timeval xbdback_fragio_intvl = { 60, 0 }; #endif @@ -274,6 +303,9 @@ static void xbdback_frontend_changed(voi static void xbdback_backend_changed(struct xenbus_watch *, const char **, unsigned int); static int xbdback_evthandler(void *); + +static int xbdback_connect(struct xbdback_instance *); +static int xbdback_disconnect(struct xbdback_instance *); static void xbdback_finish_disconnect(struct xbdback_instance *); static struct xbdback_instance *xbdif_lookup(domid_t, uint32_t); @@ -287,7 +319,6 @@ static void *xbdback_co_main_done2(struc static void *xbdback_co_cache_flush(struct xbdback_instance *, void *); static void *xbdback_co_cache_flush2(struct xbdback_instance *, void *); static void *xbdback_co_cache_doflush(struct xbdback_instance *, void *); -static void *xbdback_co_cache_doflush_wait(struct xbdback_instance *, void *); static void *xbdback_co_io(struct xbdback_instance *, void *); static void *xbdback_co_io_gotreq(struct xbdback_instance *, void *); @@ -297,12 +328,13 @@ static void *xbdback_co_io_gotio2(struct static void *xbdback_co_io_gotfrag(struct xbdback_instance *, void *); static void *xbdback_co_io_gotfrag2(struct xbdback_instance *, void *); -static void *xbdback_co_flush(struct xbdback_instance *, void *); -static void *xbdback_co_flush_done(struct xbdback_instance *, void *); +static void *xbdback_co_map_io(struct xbdback_instance *, void *); +static void *xbdback_co_do_io(struct xbdback_instance *, void *); + +static void *xbdback_co_wait_shm_callback(struct xbdback_instance *, void *); static int xbdback_shm_callback(void *); static void xbdback_io_error(struct xbdback_io *, int); -static void xbdback_do_io(struct work *, void *); static void xbdback_iodone(struct buf *); static void xbdback_send_reply(struct xbdback_instance *, uint64_t , int , int); @@ -312,6 +344,8 @@ static void xbdback_unmap_shm(struct xbd static void *xbdback_pool_get(struct xbdback_pool *, struct xbdback_instance *); static void xbdback_pool_put(struct xbdback_pool *, void *); +static void xbdback_thread(void *); +static void xbdback_wakeup_thread(struct xbdback_instance *); static void xbdback_trampoline(struct xbdback_instance *, void *); static struct xenbus_backend_driver xbd_backend_driver = { @@ -319,8 +353,6 @@ static struct xenbus_backend_driver xbd_ .xbakd_type = "vbd" }; -struct workqueue *xbdback_workqueue; - void xbdbackattach(int n) { @@ -333,27 +365,26 @@ xbdbackattach(int n) SLIST_INIT(&xbdback_instances); SIMPLEQ_INIT(&xbdback_shmq); xbdback_shmcb = 0; - pool_init(&xbdback_request_pool.p, sizeof(struct xbdback_request), - 0, 0, 0, "xbbrp", NULL, IPL_BIO); - SIMPLEQ_INIT(&xbdback_request_pool.q); - pool_init(&xbdback_io_pool.p, sizeof(struct xbdback_io), - 0, 0, 0, "xbbip", NULL, IPL_BIO); - SIMPLEQ_INIT(&xbdback_io_pool.q); - pool_init(&xbdback_fragment_pool.p, sizeof(struct xbdback_fragment), - 0, 0, 0, "xbbfp", NULL, IPL_BIO); - SIMPLEQ_INIT(&xbdback_fragment_pool.q); + + pool_cache_bootstrap(&xbdback_request_pool.pc, + sizeof(struct xbdback_request), 0, 0, 0, "xbbrp", NULL, + IPL_SOFTBIO, NULL, NULL, NULL); + pool_cache_bootstrap(&xbdback_io_pool.pc, + sizeof(struct xbdback_io), 0, 0, 0, "xbbip", NULL, + IPL_SOFTBIO, NULL, NULL, NULL); + pool_cache_bootstrap(&xbdback_fragment_pool.pc, + sizeof(struct xbdback_fragment), 0, 0, 0, "xbbfp", NULL, + IPL_SOFTBIO, NULL, NULL, NULL); + /* we allocate enough to handle a whole ring at once */ - if (pool_prime(&xbdback_request_pool.p, BLKIF_RING_SIZE) != 0) + if (pool_prime(&xbdback_request_pool.pc.pc_pool, BLKIF_RING_SIZE) != 0) printf("xbdback: failed to prime request pool\n"); - if (pool_prime(&xbdback_io_pool.p, BLKIF_RING_SIZE) != 0) + if (pool_prime(&xbdback_io_pool.pc.pc_pool, BLKIF_RING_SIZE) != 0) printf("xbdback: failed to prime io pool\n"); - if (pool_prime(&xbdback_fragment_pool.p, + if (pool_prime(&xbdback_fragment_pool.pc.pc_pool, BLKIF_MAX_SEGMENTS_PER_REQUEST * BLKIF_RING_SIZE) != 0) printf("xbdback: failed to prime fragment pool\n"); - if (workqueue_create(&xbdback_workqueue, "xbdbackd", - xbdback_do_io, NULL, PRI_BIO, IPL_BIO, 0)) - printf("xbdback: failed to init workqueue\n"); xenbus_backend_register(&xbd_backend_driver); } @@ -396,23 +427,28 @@ xbdback_xenbus_create(struct xenbus_devi if (xbdif_lookup(domid, handle) != NULL) { return EEXIST; } - xbdi = malloc(sizeof(struct xbdback_instance), M_DEVBUF, - M_NOWAIT | M_ZERO); - if (xbdi == NULL) { - return ENOMEM; - } + xbdi = kmem_zalloc(sizeof(*xbdi), KM_SLEEP); + xbdi->xbdi_domid = domid; xbdi->xbdi_handle = handle; + snprintf(xbdi->xbdi_name, sizeof(xbdi->xbdi_name), "xbdb%di%d", + xbdi->xbdi_domid, xbdi->xbdi_handle); + + /* initialize status and reference counter */ xbdi->xbdi_status = DISCONNECTED; - xbdi->xbdi_refcnt = 1; + xbdi_get(xbdi); + + mutex_init(&xbdi->xbdi_lock, MUTEX_DEFAULT, IPL_BIO); + cv_init(&xbdi->xbdi_cv, xbdi->xbdi_name); SLIST_INSERT_HEAD(&xbdback_instances, xbdi, next); + xbusd->xbusd_u.b.b_cookie = xbdi; xbusd->xbusd_u.b.b_detach = xbdback_xenbus_destroy; xbusd->xbusd_otherend_changed = xbdback_frontend_changed; xbdi->xbdi_xbusd = xbusd; error = xenbus_watch_path2(xbusd, xbusd->xbusd_path, "physical-device", - &xbdi->xbdi_watch, xbdback_backend_changed); + &xbdi->xbdi_watch, xbdback_backend_changed); if (error) { printf("failed to watch on %s/physical-device: %d\n", xbusd->xbusd_path, error); @@ -429,7 +465,7 @@ xbdback_xenbus_create(struct xenbus_devi fail2: unregister_xenbus_watch(&xbdi->xbdi_watch); fail: - free(xbdi, M_DEVBUF); + kmem_free(xbdi, sizeof(*xbdi)); return error; } @@ -439,22 +475,12 @@ xbdback_xenbus_destroy(void *arg) struct xbdback_instance *xbdi = arg; struct xenbus_device *xbusd = xbdi->xbdi_xbusd; struct gnttab_unmap_grant_ref ungrop; - int err, s; + int err; XENPRINTF(("xbdback_xenbus_destroy state %d\n", xbdi->xbdi_status)); - if (xbdi->xbdi_status != DISCONNECTED) { - hypervisor_mask_event(xbdi->xbdi_evtchn); - event_remove_handler(xbdi->xbdi_evtchn, xbdback_evthandler, - xbdi); - xbdi->xbdi_status = DISCONNECTING; - s = splbio(); - xbdi_put(xbdi); - while (xbdi->xbdi_status != DISCONNECTED) { - tsleep(&xbdi->xbdi_status, PRIBIO, "xbddis", 0); - } - splx(s); - } + xbdback_disconnect(xbdi); + /* unregister watch */ if (xbdi->xbdi_watch.node) { unregister_xenbus_watch(&xbdi->xbdi_watch); @@ -487,7 +513,7 @@ xbdback_xenbus_destroy(void *arg) vn_close(xbdi->xbdi_vp, FREAD, NOCRED); } SLIST_REMOVE(&xbdback_instances, xbdi, xbdback_instance, next); - free(xbdi, M_DEVBUF); + kmem_free(xbdi, sizeof(*xbdi)); return 0; } @@ -500,7 +526,6 @@ xbdback_connect(struct xbdback_instance evtchn_op_t evop; u_long ring_ref, revtchn; char *xsproto; - char evname[16]; const char *proto; struct xenbus_device *xbusd = xbdi->xbdi_xbusd; @@ -602,17 +627,20 @@ xbdback_connect(struct xbdback_instance } xbdi->xbdi_evtchn = evop.u.bind_interdomain.local_port; - snprintf(evname, sizeof(evname), "xbdback%di%d", - xbdi->xbdi_domid, xbdi->xbdi_handle); event_set_handler(xbdi->xbdi_evtchn, xbdback_evthandler, - xbdi, IPL_BIO, evname); + xbdi, IPL_BIO, xbdi->xbdi_name); aprint_verbose("xbd backend domain %d handle %#x (%d) " "using event channel %d, protocol %s\n", xbdi->xbdi_domid, xbdi->xbdi_handle, xbdi->xbdi_handle, xbdi->xbdi_evtchn, proto); + + /* enable the xbdback event handler machinery */ + xbdi->xbdi_status = WAITING; hypervisor_enable_event(xbdi->xbdi_evtchn); hypervisor_notify_via_evtchn(xbdi->xbdi_evtchn); - xbdi->xbdi_status = CONNECTED; - return 0; + + if (kthread_create(IPL_NONE, KTHREAD_MPSAFE, NULL, + xbdback_thread, xbdi, NULL, xbdi->xbdi_name) == 0) + return 0; err2: /* unmap ring */ @@ -631,21 +659,27 @@ err: return -1; } +/* + * Signal a xbdback thread to disconnect. Done in 'xenwatch' thread context. + */ static int xbdback_disconnect(struct xbdback_instance *xbdi) { - int s; hypervisor_mask_event(xbdi->xbdi_evtchn); event_remove_handler(xbdi->xbdi_evtchn, xbdback_evthandler, xbdi); + + /* signal thread that we want to disconnect, then wait for it */ + mutex_enter(&xbdi->xbdi_lock); xbdi->xbdi_status = DISCONNECTING; - s = splbio(); - xbdi_put(xbdi); - while (xbdi->xbdi_status != DISCONNECTED) { - tsleep(&xbdi->xbdi_status, PRIBIO, "xbddis", 0); - } - splx(s); + cv_signal(&xbdi->xbdi_cv); + + while (xbdi->xbdi_status != DISCONNECTED) + cv_wait(&xbdi->xbdi_cv, &xbdi->xbdi_lock); + + mutex_exit(&xbdi->xbdi_lock); + xenbus_switch_state(xbdi->xbdi_xbusd, NULL, XenbusStateClosing); return 0; @@ -663,7 +697,7 @@ xbdback_frontend_changed(void *arg, Xenb break; case XenbusStateInitialised: case XenbusStateConnected: - if (xbdi->xbdi_status == CONNECTED) + if (xbdi->xbdi_status == WAITING || xbdi->xbdi_status == RUN) break; xbdback_connect(xbdi); break; @@ -704,14 +738,16 @@ xbdback_backend_changed(struct xenbus_wa if (err) return; /* - * we can also fire up after having openned the device, don't try + * we can also fire up after having opened the device, don't try * to do it twice. */ if (xbdi->xbdi_vp != NULL) { - if (xbdi->xbdi_status == CONNECTED && xbdi->xbdi_dev != dev) { - printf("xbdback %s: changing physical device from " - "0x%" PRIx64 " to 0x%lx not supported\n", - xbusd->xbusd_path, xbdi->xbdi_dev, dev); + if (xbdi->xbdi_status == WAITING || xbdi->xbdi_status == RUN) { + if (xbdi->xbdi_dev != dev) { + printf("xbdback %s: changing physical device " + "from %#"PRIx64" to %#lx not supported\n", + xbusd->xbusd_path, xbdi->xbdi_dev, dev); + } } return; } @@ -723,9 +759,9 @@ xbdback_backend_changed(struct xenbus_wa return; } if (mode[0] == 'w') - xbdi->xbdi_ro = 0; + xbdi->xbdi_ro = false; else - xbdi->xbdi_ro = 1; + xbdi->xbdi_ro = true; major = major(xbdi->xbdi_dev); devname = devsw_blk2name(major); if (devname == NULL) { @@ -830,14 +866,18 @@ abort: xenbus_transaction_end(xbt, 1); } - -static void xbdback_finish_disconnect(struct xbdback_instance *xbdi) +/* + * Used by a xbdi thread to signal that it is now disconnected. + */ +static void +xbdback_finish_disconnect(struct xbdback_instance *xbdi) { + KASSERT(mutex_owned(&xbdi->xbdi_lock)); KASSERT(xbdi->xbdi_status == DISCONNECTING); xbdi->xbdi_status = DISCONNECTED; - wakeup(&xbdi->xbdi_status); + cv_signal(&xbdi->xbdi_cv); } static struct xbdback_instance * @@ -860,18 +900,64 @@ xbdback_evthandler(void *arg) XENPRINTF(("xbdback_evthandler domain %d: cont %p\n", xbdi->xbdi_domid, xbdi->xbdi_cont)); - if (xbdi->xbdi_cont == NULL) { - xbdi->xbdi_cont = xbdback_co_main; - xbdback_trampoline(xbdi, xbdi); - } + xbdback_wakeup_thread(xbdi); return 1; } +/* + * Main thread routine for one xbdback instance. Woken up by + * xbdback_evthandler when a domain has I/O work scheduled in a I/O ring. + */ +static void +xbdback_thread(void *arg) +{ + struct xbdback_instance *xbdi = arg; + + for (;;) { + mutex_enter(&xbdi->xbdi_lock); + switch (xbdi->xbdi_status) { + case WAITING: + cv_wait(&xbdi->xbdi_cv, &xbdi->xbdi_lock); + mutex_exit(&xbdi->xbdi_lock); + break; + case RUN: + xbdi->xbdi_status = WAITING; /* reset state */ + mutex_exit(&xbdi->xbdi_lock); + + if (xbdi->xbdi_cont == NULL) { + xbdi->xbdi_cont = xbdback_co_main; + } + + xbdback_trampoline(xbdi, xbdi); + break; + case DISCONNECTING: + if (xbdi->xbdi_pendingreqs > 0) { + /* there are pending I/Os. Wait for them. */ + cv_wait(&xbdi->xbdi_cv, &xbdi->xbdi_lock); + mutex_exit(&xbdi->xbdi_lock); + break; + } + + /* All I/Os should have been processed by now, + * xbdi_refcnt should drop to 0 */ + xbdi_put(xbdi); + KASSERT(xbdi->xbdi_refcnt == 0); + mutex_exit(&xbdi->xbdi_lock); + kthread_exit(0); + break; + default: + panic("%s: invalid state %d", + xbdi->xbdi_name, xbdi->xbdi_status); + } + } +} + static void * xbdback_co_main(struct xbdback_instance *xbdi, void *obj) { (void)obj; + xbdi->xbdi_req_prod = xbdi->xbdi_ring.ring_n.sring->req_prod; xen_rmb(); /* ensure we see all requests up to req_prod */ /* @@ -885,15 +971,18 @@ xbdback_co_main(struct xbdback_instance /* * Fetch a blkif request from the ring, and pass control to the appropriate * continuation. + * If someone asked for disconnection, do not fetch any more request from + * the ring. */ static void * xbdback_co_main_loop(struct xbdback_instance *xbdi, void *obj) { - blkif_request_t *req = &xbdi->xbdi_xen_req; + blkif_request_t *req; blkif_x86_32_request_t *req32; blkif_x86_64_request_t *req64; (void)obj; + req = &xbdi->xbdi_xen_req; if (xbdi->xbdi_ring.ring_n.req_cons != xbdi->xbdi_req_prod) { switch(xbdi->xbdi_proto) { case XBDIP_NATIVE: @@ -937,8 +1026,11 @@ xbdback_co_main_loop(struct xbdback_inst xbdi->xbdi_cont = xbdback_co_cache_flush; break; default: - printf("xbdback_evthandler domain %d: unknown " - "operation %d\n", xbdi->xbdi_domid, req->operation); + if (ratecheck(&xbdi->xbdi_lasterr_time, + &xbdback_err_intvl)) { + printf("%s: unknown operation %d\n", + xbdi->xbdi_name, req->operation); + } xbdback_send_reply(xbdi, req->id, req->operation, BLKIF_RSP_ERROR); xbdi->xbdi_cont = xbdback_co_main_incr; @@ -951,8 +1043,8 @@ xbdback_co_main_loop(struct xbdback_inst } /* - * Increment consumer index and move on to the next request. In case index - * leads to ring overflow, bail out. + * Increment consumer index and move on to the next request. In case + * we want to disconnect, leave continuation now. */ static void * xbdback_co_main_incr(struct xbdback_instance *xbdi, void *obj) @@ -961,11 +1053,24 @@ xbdback_co_main_incr(struct xbdback_inst blkif_back_ring_t *ring = &xbdi->xbdi_ring.ring_n; ring->req_cons++; - if (RING_REQUEST_CONS_OVERFLOW(ring, ring->req_cons)) + + /* + * Do not bother with locking here when checking for xbdi_status: if + * we get a transient state, we will get the right value at + * the next increment. + */ + if (xbdi->xbdi_status == DISCONNECTING) xbdi->xbdi_cont = NULL; else xbdi->xbdi_cont = xbdback_co_main_loop; + /* + * Each time the thread processes a full ring of requests, give + * a chance to other threads to process I/Os too + */ + if ((ring->req_cons % BLKIF_RING_SIZE) == 0) + yield(); + return xbdi; } @@ -980,7 +1085,7 @@ xbdback_co_main_done(struct xbdback_inst if (xbdi->xbdi_io != NULL) { KASSERT(xbdi->xbdi_io->xio_operation == BLKIF_OP_READ || xbdi->xbdi_io->xio_operation == BLKIF_OP_WRITE); - xbdi->xbdi_cont = xbdback_co_flush; + xbdi->xbdi_cont = xbdback_co_map_io; xbdi->xbdi_cont_aux = xbdback_co_main_done2; } else { xbdi->xbdi_cont = xbdback_co_main_done2; @@ -1002,6 +1107,7 @@ xbdback_co_main_done2(struct xbdback_ins xbdi->xbdi_cont = xbdback_co_main; else xbdi->xbdi_cont = NULL; + return xbdi; } @@ -1012,14 +1118,14 @@ static void * xbdback_co_cache_flush(struct xbdback_instance *xbdi, void *obj) { (void)obj; - KASSERT(curcpu()->ci_ilevel >= IPL_BIO); + XENPRINTF(("xbdback_co_cache_flush %p %p\n", xbdi, obj)); if (xbdi->xbdi_io != NULL) { /* Some I/Os are required for this instance. Process them. */ KASSERT(xbdi->xbdi_io->xio_operation == BLKIF_OP_READ || xbdi->xbdi_io->xio_operation == BLKIF_OP_WRITE); KASSERT(xbdi->xbdi_pendingreqs > 0); - xbdi->xbdi_cont = xbdback_co_flush; + xbdi->xbdi_cont = xbdback_co_map_io; xbdi->xbdi_cont_aux = xbdback_co_cache_flush2; } else { xbdi->xbdi_cont = xbdback_co_cache_flush2; @@ -1045,7 +1151,7 @@ xbdback_co_cache_flush2(struct xbdback_i return xbdback_pool_get(&xbdback_io_pool, xbdi); } -/* Enqueue the flush work */ +/* Start the flush work */ static void * xbdback_co_cache_doflush(struct xbdback_instance *xbdi, void *obj) { @@ -1056,25 +1162,8 @@ xbdback_co_cache_doflush(struct xbdback_ xbd_io->xio_xbdi = xbdi; xbd_io->xio_operation = xbdi->xbdi_xen_req.operation; xbd_io->xio_flush_id = xbdi->xbdi_xen_req.id; - workqueue_enqueue(xbdback_workqueue, &xbdi->xbdi_io->xio_work, NULL); - /* - * xbdback_do_io() will advance req pointer and restart processing. - * Note that we could probably set xbdi->xbdi_io to NULL and - * let the processing continue, but we really want to wait - * for the flush to complete before doing any more work. - */ - xbdi->xbdi_cont = xbdback_co_cache_doflush_wait; - return NULL; -} - -/* wait for the flush work to complete */ -static void * -xbdback_co_cache_doflush_wait(struct xbdback_instance *xbdi, void *obj) -{ - (void)obj; - /* abort the continuation loop; xbdback_do_io() will restart it */ - xbdi->xbdi_cont = xbdback_co_cache_doflush_wait; - return NULL; + xbdi->xbdi_cont = xbdback_co_do_io; + return xbdi; } /* @@ -1095,8 +1184,12 @@ xbdback_co_io(struct xbdback_instance *x req = &xbdi->xbdi_xen_req; if (req->nr_segments < 1 || req->nr_segments > BLKIF_MAX_SEGMENTS_PER_REQUEST) { - printf("xbdback_io domain %d: %d segments\n", - xbdi->xbdi_domid, xbdi->xbdi_xen_req.nr_segments); + if (ratecheck(&xbdi->xbdi_lasterr_time, + &xbdback_err_intvl)) { + printf("%s: invalid number of segments: %d\n", + xbdi->xbdi_name, + xbdi->xbdi_xen_req.nr_segments); + } error = EINVAL; goto end; } @@ -1181,10 +1274,10 @@ xbdback_co_io_gotreq(struct xbdback_inst xbdi->xbdi_domid)); xbdi->xbdi_next_sector = xbdi->xbdi_xen_req.sector_number; - xbdi->xbdi_cont_aux = xbdi->xbdi_cont; KASSERT(xbdi->xbdi_io->xio_operation == BLKIF_OP_READ || xbdi->xbdi_io->xio_operation == BLKIF_OP_WRITE); - xbdi->xbdi_cont = xbdback_co_flush; + xbdi->xbdi_cont_aux = xbdback_co_io_loop; + xbdi->xbdi_cont = xbdback_co_map_io; } } else { xbdi->xbdi_next_sector = xbdi->xbdi_xen_req.sector_number; @@ -1196,8 +1289,6 @@ xbdback_co_io_gotreq(struct xbdback_inst static void * xbdback_co_io_loop(struct xbdback_instance *xbdi, void *obj) { - struct xbdback_io *xio; - (void)obj; KASSERT(xbdi->xbdi_req->rq_operation == BLKIF_OP_READ || xbdi->xbdi_req->rq_operation == BLKIF_OP_WRITE); @@ -1239,25 +1330,26 @@ xbdback_co_io_loop(struct xbdback_instan #endif ) { #ifdef DEBUG - static struct timeval gluetimer; - if (ratecheck(&gluetimer, - &xbdback_fragio_intvl)) - printf("xbdback: domain %d sending" + if (ratecheck(&xbdi->xbdi_lastfragio_time, + &xbdback_fragio_intvl)) + printf("%s: domain is sending" " excessively fragmented I/O\n", - xbdi->xbdi_domid); + xbdi->xbdi_name); #endif - printf("xbdback_io: would maybe glue same page sec %d (%d->%d)\n", xbdi->xbdi_segno, this_fs, this_ls); - panic("notyet!"); + printf("xbdback_io: would maybe glue " + "same page sec %d (%d->%d)\n", + xbdi->xbdi_segno, this_fs, this_ls); XENPRINTF(("xbdback_io domain %d: glue same " "page", xbdi->xbdi_domid)); + panic("notyet!"); xbdi->xbdi_same_page = 1; } else { - xbdi->xbdi_cont_aux = xbdback_co_io_loop; KASSERT(xbdi->xbdi_io->xio_operation == BLKIF_OP_READ || xbdi->xbdi_io->xio_operation == BLKIF_OP_WRITE); - xbdi->xbdi_cont = xbdback_co_flush; + xbdi->xbdi_cont_aux = xbdback_co_io_loop; + xbdi->xbdi_cont = xbdback_co_map_io; return xbdi; } } else @@ -1265,8 +1357,7 @@ xbdback_co_io_loop(struct xbdback_instan if (xbdi->xbdi_io == NULL) { xbdi->xbdi_cont = xbdback_co_io_gotio; - xio = xbdback_pool_get(&xbdback_io_pool, xbdi); - return xio; + return xbdback_pool_get(&xbdback_io_pool, xbdi); } else { xbdi->xbdi_cont = xbdback_co_io_gotio2; } @@ -1274,7 +1365,7 @@ xbdback_co_io_loop(struct xbdback_instan /* done with the loop over segments; get next request */ xbdi->xbdi_cont = xbdback_co_main_incr; } - return xbdi; + return xbdi; } /* Prepare an I/O buffer for a xbdback instance */ @@ -1285,7 +1376,6 @@ xbdback_co_io_gotio(struct xbdback_insta vaddr_t start_offset; /* start offset in vm area */ int buf_flags; - KASSERT(curcpu()->ci_ilevel >= IPL_BIO); xbdi_get(xbdi); atomic_inc_uint(&xbdi->xbdi_pendingreqs); @@ -1392,31 +1482,19 @@ xbdback_co_io_gotfrag2(struct xbdback_in } /* - * Map the different I/O requests in backend's VA space, then schedule - * the I/O work. + * Map the different I/O requests in backend's VA space. */ static void * -xbdback_co_flush(struct xbdback_instance *xbdi, void *obj) +xbdback_co_map_io(struct xbdback_instance *xbdi, void *obj) { (void)obj; XENPRINTF(("xbdback_io domain %d: flush sect %ld size %d ptr 0x%lx\n", xbdi->xbdi_domid, (long)xbdi->xbdi_io->xio_buf.b_blkno, (int)xbdi->xbdi_io->xio_buf.b_bcount, (long)xbdi->xbdi_io)); - xbdi->xbdi_cont = xbdback_co_flush_done; + xbdi->xbdi_cont = xbdback_co_do_io; return xbdback_map_shm(xbdi->xbdi_io); } -/* Transfer all I/O work to the workqueue */ -static void * -xbdback_co_flush_done(struct xbdback_instance *xbdi, void *obj) -{ - (void)obj; - workqueue_enqueue(xbdback_workqueue, &xbdi->xbdi_io->xio_work, NULL); - xbdi->xbdi_io = NULL; - xbdi->xbdi_cont = xbdi->xbdi_cont_aux; - return xbdi; -} - static void xbdback_io_error(struct xbdback_io *xbd_io, int error) { @@ -1425,22 +1503,19 @@ xbdback_io_error(struct xbdback_io *xbd_ } /* - * Main xbdback workqueue routine: performs I/O on behalf of backend. Has - * thread context. + * Main xbdback I/O routine. It can either perform a flush operation or + * schedule a read/write operation. */ -static void -xbdback_do_io(struct work *wk, void *dummy) +static void * +xbdback_co_do_io(struct xbdback_instance *xbdi, void *obj) { - struct xbdback_io *xbd_io = (void *)wk; - int s; - KASSERT(&xbd_io->xio_work == wk); + struct xbdback_io *xbd_io = xbdi->xbdi_io; switch (xbd_io->xio_operation) { case BLKIF_OP_FLUSH_DISKCACHE: { int error; int force = 1; - struct xbdback_instance *xbdi = xbd_io->xio_xbdi; error = VOP_IOCTL(xbdi->xbdi_vp, DIOCCACHESYNC, &force, FWRITE, kauth_cred_get()); @@ -1457,13 +1532,9 @@ xbdback_do_io(struct work *wk, void *dum xbd_io->xio_operation, error); xbdback_pool_put(&xbdback_io_pool, xbd_io); xbdi_put(xbdi); - /* handle next IO */ - s = splbio(); xbdi->xbdi_io = NULL; xbdi->xbdi_cont = xbdback_co_main_incr; - xbdback_trampoline(xbdi, xbdi); - splx(s); - break; + return xbdi; } case BLKIF_OP_READ: case BLKIF_OP_WRITE: @@ -1476,16 +1547,16 @@ xbdback_do_io(struct work *wk, void *dum ((((bdata + xbd_io->xio_buf.b_bcount - 1) & ~PAGE_MASK) - (bdata & ~PAGE_MASK)) >> PAGE_SHIFT) + 1; if ((bdata & ~PAGE_MASK) != (xbd_io->xio_vaddr & ~PAGE_MASK)) { - printf("xbdback_do_io: vaddr %#" PRIxVADDR + printf("xbdback_co_do_io: vaddr %#" PRIxVADDR " bdata %#" PRIxVADDR "\n", xbd_io->xio_vaddr, bdata); - panic("xbdback_do_io: bdata page change"); + panic("xbdback_co_do_io: bdata page change"); } if (nsegs > xbd_io->xio_nrma) { - printf("xbdback_do_io: vaddr %#" PRIxVADDR + printf("xbdback_co_do_io: vaddr %#" PRIxVADDR " bcount %#x doesn't fit in %d pages\n", bdata, xbd_io->xio_buf.b_bcount, xbd_io->xio_nrma); - panic("xbdback_do_io: not enough pages"); + panic("xbdback_co_do_io: not enough pages"); } } #endif @@ -1495,22 +1566,29 @@ xbdback_do_io(struct work *wk, void *dum mutex_exit(xbd_io->xio_buf.b_vp->v_interlock); } bdev_strategy(&xbd_io->xio_buf); - break; + /* will call xbdback_iodone() asynchronously when done */ + xbdi->xbdi_io = NULL; + xbdi->xbdi_cont = xbdi->xbdi_cont_aux; + return xbdi; default: /* Should never happen */ - panic("xbdback_do_io: unsupported operation %d", + panic("xbdback_co_do_io: unsupported operation %d", xbd_io->xio_operation); } } -/* This gets reused by xbdback_io_error to report errors from other sources. */ +/* + * Called from softint(9) context when an I/O is done: for each request, send + * back the associated reply to the domain. + * + * This gets reused by xbdback_io_error to report errors from other sources. + */ static void xbdback_iodone(struct buf *bp) { struct xbdback_io *xbd_io; struct xbdback_instance *xbdi; int errp; - int s; xbd_io = bp->b_private; xbdi = xbd_io->xio_xbdi; @@ -1554,7 +1632,8 @@ xbdback_iodone(struct buf *bp) ? BLKIF_RSP_ERROR : BLKIF_RSP_OKAY; - XENPRINTF(("xbdback_io domain %d: end request %" PRIu64 " error=%d\n", + XENPRINTF(("xbdback_io domain %d: end request %"PRIu64 + "error=%d\n", xbdi->xbdi_domid, xbd_req->rq_id, error)); xbdback_send_reply(xbdi, xbd_req->rq_id, xbd_req->rq_operation, error); @@ -1564,18 +1643,29 @@ xbdback_iodone(struct buf *bp) atomic_dec_uint(&xbdi->xbdi_pendingreqs); buf_destroy(&xbd_io->xio_buf); xbdback_pool_put(&xbdback_io_pool, xbd_io); - s = splbio(); - if (xbdi->xbdi_cont == NULL) { - /* check if there is more work to do */ - xbdi->xbdi_cont = xbdback_co_main; - xbdback_trampoline(xbdi, xbdi); - } - splx(s); + + xbdback_wakeup_thread(xbdi); +} + +/* + * Wake up the per xbdback instance thread. + */ +static void +xbdback_wakeup_thread(struct xbdback_instance *xbdi) +{ + + mutex_enter(&xbdi->xbdi_lock); + /* only set RUN state when we are WAITING for work */ + if (xbdi->xbdi_status == WAITING) + xbdi->xbdi_status = RUN; + mutex_exit(&xbdi->xbdi_lock); + + cv_broadcast(&xbdi->xbdi_cv); } /* * called once a request has completed. Place the reply in the ring and - * notify the guest OS + * notify the guest OS. */ static void xbdback_send_reply(struct xbdback_instance *xbdi, uint64_t id, @@ -1586,7 +1676,13 @@ xbdback_send_reply(struct xbdback_instan blkif_x86_64_response_t *resp64; int notify; - switch(xbdi->xbdi_proto) { + /* + * The ring can be accessed by the xbdback thread, xbdback_iodone() + * handler, or any handler that triggered the shm callback. So + * protect ring access via the xbdi_lock mutex. + */ + mutex_enter(&xbdi->xbdi_lock); + switch (xbdi->xbdi_proto) { case XBDIP_NATIVE: resp_n = RING_GET_RESPONSE(&xbdi->xbdi_ring.ring_n, xbdi->xbdi_ring.ring_n.rsp_prod_pvt); @@ -1611,6 +1707,8 @@ xbdback_send_reply(struct xbdback_instan } xbdi->xbdi_ring.ring_n.rsp_prod_pvt++; RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&xbdi->xbdi_ring.ring_n, notify); + mutex_exit(&xbdi->xbdi_lock); + if (notify) { XENPRINTF(("xbdback_send_reply notify %d\n", xbdi->xbdi_domid)); hypervisor_notify_via_evtchn(xbdi->xbdi_evtchn); @@ -1640,9 +1738,10 @@ xbdback_map_shm(struct xbdback_io *xbd_i xbdi = xbd_io->xio_xbdi; xbd_rq = SLIST_FIRST(&xbd_io->xio_rq)->car; + error = xen_shm_map(xbd_io->xio_nrma, xbdi->xbdi_domid, xbd_io->xio_gref, &xbd_io->xio_vaddr, xbd_io->xio_gh, - (xbd_rq->rq_operation == BLKIF_OP_WRITE) ? XSHM_RO: 0); + (xbd_rq->rq_operation == BLKIF_OP_WRITE) ? XSHM_RO : 0); switch(error) { case 0: @@ -1654,7 +1753,7 @@ xbdback_map_shm(struct xbdback_io *xbd_i printf("\n"); #endif xbd_io->xio_mapped = 1; - return (void *)xbd_io->xio_vaddr; + return xbdi; case ENOMEM: s = splvm(); if (!xbdback_shmcb) { @@ -1668,10 +1767,11 @@ xbdback_map_shm(struct xbdback_io *xbd_i } SIMPLEQ_INSERT_TAIL(&xbdback_shmq, xbdi, xbdi_on_hold); splx(s); + /* Put the thread to sleep until the callback is called */ + xbdi->xbdi_cont = xbdback_co_wait_shm_callback; return NULL; default: - printf("xbdback_map_shm: xen_shm error %d ", - error); + printf("xbdback_map_shm: xen_shm error %d ", error); xbdback_io_error(xbdi->xbdi_io, error); xbdi->xbdi_io = NULL; xbdi->xbdi_cont = xbdi->xbdi_cont_aux; @@ -1684,6 +1784,11 @@ xbdback_shm_callback(void *arg) { int error, s; + /* + * The shm callback may be executed at any level, including + * IPL_BIO and IPL_NET levels. Raise to the lowest priority level + * that can mask both. + */ s = splvm(); while(!SIMPLEQ_EMPTY(&xbdback_shmq)) { struct xbdback_instance *xbdi; @@ -1705,19 +1810,17 @@ xbdback_shm_callback(void *arg) splx(s); return -1; /* will try again later */ case 0: - xbd_io->xio_mapped = 1; SIMPLEQ_REMOVE_HEAD(&xbdback_shmq, xbdi_on_hold); - (void)splbio(); - xbdback_trampoline(xbdi, xbdi); + xbd_io->xio_mapped = 1; + xbdback_wakeup_thread(xbdi); break; default: SIMPLEQ_REMOVE_HEAD(&xbdback_shmq, xbdi_on_hold); - (void)splbio(); printf("xbdback_shm_callback: xen_shm error %d\n", error); - xbdi->xbdi_cont = xbdi->xbdi_cont_aux; xbdback_io_error(xbd_io, error); - xbdback_trampoline(xbdi, xbdi); + xbdi->xbdi_io = NULL; + xbdback_wakeup_thread(xbdi); break; } } @@ -1726,6 +1829,26 @@ xbdback_shm_callback(void *arg) return 0; } +/* + * Allows waiting for the shm callback to complete. + */ +static void * +xbdback_co_wait_shm_callback(struct xbdback_instance *xbdi, void *obj) +{ + + if (xbdi->xbdi_io == NULL || xbdi->xbdi_io->xio_mapped == 1) { + /* + * Only proceed to next step when the callback reported + * success or failure. + */ + xbdi->xbdi_cont = xbdi->xbdi_cont_aux; + return xbdi; + } else { + /* go back to sleep */ + return NULL; + } +} + /* unmap a request from our virtual address space (request is done) */ static void xbdback_unmap_shm(struct xbdback_io *xbd_io) @@ -1746,44 +1869,19 @@ xbdback_unmap_shm(struct xbdback_io *xbd xbd_io->xio_vaddr = -1; } -/* Obtain memory from a pool, in cooperation with the continuations. */ -static void *xbdback_pool_get(struct xbdback_pool *pp, +/* Obtain memory from a pool */ +static void * +xbdback_pool_get(struct xbdback_pool *pp, struct xbdback_instance *xbdi) { - int s; - void *item; - - item = pool_get(&pp->p, PR_NOWAIT); - if (item == NULL) { - if (ratecheck(&pp->last_warning, &xbdback_poolsleep_intvl)) - printf("xbdback_pool_get: %s is full", - pp->p.pr_wchan); - s = splvm(); - SIMPLEQ_INSERT_TAIL(&pp->q, xbdi, xbdi_on_hold); - splx(s); - } - return item; + return pool_cache_get(&pp->pc, PR_WAITOK); } -/* - * Restore memory to a pool... unless an xbdback instance had been - * waiting for it, in which case that gets the memory first. - */ -static void xbdback_pool_put(struct xbdback_pool *pp, void *item) +/* Restore memory to a pool */ +static void +xbdback_pool_put(struct xbdback_pool *pp, void *item) { - int s; - - s = splvm(); - if (SIMPLEQ_EMPTY(&pp->q)) { - splx(s); - pool_put(&pp->p, item); - } else { - struct xbdback_instance *xbdi = SIMPLEQ_FIRST(&pp->q); - SIMPLEQ_REMOVE_HEAD(&pp->q, xbdi_on_hold); - (void)splbio(); - xbdback_trampoline(xbdi, item); - splx(s); - } + pool_cache_put(&pp->pc, item); } /* @@ -1794,7 +1892,6 @@ static void xbdback_trampoline(struct xbdback_instance *xbdi, void *obj) { xbdback_cont_t cont; - KASSERT(curcpu()->ci_ilevel >= IPL_BIO); while(obj != NULL && xbdi->xbdi_cont != NULL) { cont = xbdi->xbdi_cont;