Module Name:    src
Committed By:   jym
Date:           Thu Nov 24 01:47:18 UTC 2011

Modified Files:
        src/sys/arch/xen/xen: xbdback_xenbus.c

Log Message:
Deep rework of the xbdback(4) driver; it now uses a thread per instance
instead of continuations directly from shm callbacks or interrupt
handlers. The whole CPS design remains but is adapted to cope with
a thread model.

This patch allows scheduling away I/O requests of domains that behave
abnormally, or even destroy them if there is a need to (without thrashing
dom0 with lots of error messages at IPL_BIO).

I took this opportunity to make the driver MPSAFE, so multiple instances
can run concurrently. Moved from home-grown pool(9) queues to
pool_cache(9), and rework the callback mechanism so that it delegates
I/O processing to thread instead of handling it itself through the
continuation trampoline.

This one fixes the potential DoS many have seen in a dom0 when trying to
suspend a NetBSD domU with a corrupted I/O ring.

Benchmarks (build.sh release runs and bonnie++) do not show any
performance regression, the "new" driver is on-par with the "old" one.

ok bouyer@.


To generate a diff of this commit:
cvs rdiff -u -r1.51 -r1.52 src/sys/arch/xen/xen/xbdback_xenbus.c

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/sys/arch/xen/xen/xbdback_xenbus.c
diff -u src/sys/arch/xen/xen/xbdback_xenbus.c:1.51 src/sys/arch/xen/xen/xbdback_xenbus.c:1.52
--- src/sys/arch/xen/xen/xbdback_xenbus.c:1.51	Mon Nov 14 21:34:50 2011
+++ src/sys/arch/xen/xen/xbdback_xenbus.c	Thu Nov 24 01:47:18 2011
@@ -1,4 +1,4 @@
-/*      $NetBSD: xbdback_xenbus.c,v 1.51 2011/11/14 21:34:50 christos Exp $      */
+/*      $NetBSD: xbdback_xenbus.c,v 1.52 2011/11/24 01:47:18 jym Exp $      */
 
 /*
  * Copyright (c) 2006 Manuel Bouyer.
@@ -26,23 +26,27 @@
  */
 
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: xbdback_xenbus.c,v 1.51 2011/11/14 21:34:50 christos Exp $");
+__KERNEL_RCSID(0, "$NetBSD: xbdback_xenbus.c,v 1.52 2011/11/24 01:47:18 jym Exp $");
 
-#include <sys/types.h>
-#include <sys/param.h>
-#include <sys/systm.h>
-#include <sys/malloc.h>
-#include <sys/queue.h>
-#include <sys/kernel.h>
 #include <sys/atomic.h>
+#include <sys/buf.h>
+#include <sys/condvar.h>
 #include <sys/conf.h>
 #include <sys/disk.h>
 #include <sys/device.h>
 #include <sys/fcntl.h>
-#include <sys/vnode.h>
 #include <sys/kauth.h>
-#include <sys/workqueue.h>
-#include <sys/buf.h>
+#include <sys/kernel.h>
+#include <sys/kmem.h>
+#include <sys/kthread.h>
+#include <sys/malloc.h>
+#include <sys/mutex.h>
+#include <sys/param.h>
+#include <sys/queue.h>
+#include <sys/systm.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/vnode.h>
 
 #include <xen/xen.h>
 #include <xen/xen_shm.h>
@@ -75,39 +79,54 @@ struct xbdback_io;
 struct xbdback_fragment;
 struct xbdback_instance;
 
-/* state of a xbdback instance */
-typedef enum {CONNECTED, DISCONNECTING, DISCONNECTED} xbdback_state_t;
+/*
+ * status of a xbdback instance:
+ * WAITING: xbdback instance is connected, waiting for requests
+ * RUN: xbdi thread must be woken up, I/Os have to be processed
+ * DISCONNECTING: the instance is closing, no more I/Os can be scheduled
+ * DISCONNECTED: no I/Os, no ring, the thread should terminate.
+ */
+typedef enum {WAITING, RUN, DISCONNECTING, DISCONNECTED} xbdback_state_t;
 
 /*
- * Since there are a variety of conditions that can block our I/O
- * processing, which isn't allowed to suspend its thread's execution,
- * such things will be done in a sort of continuation-passing style.
- * 
- * Return value is NULL to indicate that execution has blocked; if
- * it's finished, set xbdi->xbdi_cont (see below) to NULL and the return
- * doesn't matter.  Otherwise it's passed as the second parameter to
- * the new value of xbdi->xbdi_cont.
+ * Each xbdback instance is managed by a single thread that handles all
+ * the I/O processing. As there are a variety of conditions that can block,
+ * everything will be done in a sort of continuation-passing style.
+ *
+ * When the execution has to block to delay processing, for example to
+ * allow system to recover because of memory shortage (via shared memory
+ * callback), the return value of a continuation can be set to NULL. In that
+ * case, the thread will go back to sleeping and wait for the proper
+ * condition before it starts processing requests again from where it left.
+ * Continuation state is "stored" in the xbdback instance (xbdi_cont and
+ * xbdi_cont_aux), and should only be manipulated by the instance thread.
  *
+ * As xbdback(4) has to handle different sort of asynchronous events (Xen
+ * event channels, biointr() soft interrupts, xenbus commands), the xbdi_lock
+ * mutex is used to protect specific elements of the xbdback instance from
+ * concurrent access: thread status and ring access (when pushing responses).
+ * 
  * Here's how the call graph is supposed to be for a single I/O:
+ *
  * xbdback_co_main()
  *        |
- *        |         --> xbdback_co_cache_doflush() or NULL
- *        |         |                    
- *        |         -- xbdback_co_cache_flush2() <- xbdback_co_flush_done() <--
- *        |                                       |                           |
- *        |               |-> xbdback_co_cache_flush() -> xbdback_co_flush() --
+ *        |               --> xbdback_co_cache_doflush() or NULL
+ *        |               |
+ *        |               - xbdback_co_cache_flush2() <- xbdback_co_do_io() <-
+ *        |                                            |                     |
+ *        |               |-> xbdback_co_cache_flush() -> xbdback_co_map_io()-
  * xbdback_co_main_loop()-|
- *        |               |->  xbdback_co_main_done()  -> xbdback_co_flush() --
- *        |                                       |                           |
- *        |           -- xbdback_co_main_done2() <- xbdback_co_flush_done() <--
- *        |           |
- *        |           --> xbdback_co_main() or NULL
+ *        |               |-> xbdback_co_main_done() ---> xbdback_co_map_io()-
+ *        |                                           |                      |
+ *        |               -- xbdback_co_main_done2() <-- xbdback_co_do_io() <-
+ *        |               |
+ *        |               --> xbdback_co_main() or NULL
  *        |
  *     xbdback_co_io() -> xbdback_co_main_incr() -> xbdback_co_main_loop()
  *        |
- *     xbdback_co_io_gotreq()--+---------> xbdback_co_flush() --
- *        |                    |                               |
- *  -> xbdback_co_io_loop()----|  <- xbdback_co_flush_done() <--
+ *     xbdback_co_io_gotreq()--+--> xbdback_co_map_io() ---
+ *        |                    |                          |
+ *  -> xbdback_co_io_loop()----|  <- xbdback_co_do_io() <--
  *  |     |     |     |
  *  |     |     |     |----------> xbdback_co_io_gotio()
  *  |     |     |                         |
@@ -131,21 +150,24 @@ enum xbdi_proto {
 	XBDIP_64
 };
 
-
 /* we keep the xbdback instances in a linked list */
 struct xbdback_instance {
 	SLIST_ENTRY(xbdback_instance) next;
 	struct xenbus_device *xbdi_xbusd; /* our xenstore entry */
 	struct xenbus_watch xbdi_watch; /* to watch our store */
-	domid_t xbdi_domid;		/* attached to this domain */
+	domid_t xbdi_domid;	/* attached to this domain */
 	uint32_t xbdi_handle;	/* domain-specific handle */
-	xbdback_state_t xbdi_status;
+	char xbdi_name[16];	/* name of this instance */
+	/* mutex that protects concurrent access to the xbdback instance */
+	kmutex_t xbdi_lock;
+	kcondvar_t xbdi_cv;	/* wait channel for thread work */
+	xbdback_state_t xbdi_status; /* thread's status */
 	/* backing device parameters */
 	dev_t xbdi_dev;
 	const struct bdevsw *xbdi_bdevsw; /* pointer to the device's bdevsw */
 	struct vnode *xbdi_vp;
 	uint64_t xbdi_size;
-	int xbdi_ro; /* is device read-only ? */
+	bool xbdi_ro; /* is device read-only ? */
 	/* parameters for the communication */
 	unsigned int xbdi_evtchn;
 	/* private parameters for communication */
@@ -176,6 +198,11 @@ struct xbdback_instance {
 	/* other state */
 	int xbdi_same_page; /* are we merging two segments on the same page? */
 	uint xbdi_pendingreqs; /* number of I/O in fly */
+	int xbdi_errps; /* errors per second */
+	struct timeval xbdi_lasterr_time;    /* error time tracking */
+#ifdef DEBUG
+	struct timeval xbdi_lastfragio_time; /* fragmented I/O tracking */
+#endif
 };
 /* Manipulation of the above reference count. */
 #define xbdi_get(xbdip) atomic_inc_uint(&(xbdip)->xbdi_refcnt)
@@ -208,7 +235,6 @@ struct xbdback_request {
  * can be coalesced.
  */
 struct xbdback_io {
-	struct work xio_work;
 	/* The instance pointer is duplicated for convenience. */
 	struct xbdback_instance *xio_xbdi; /* our xbd instance */
 	uint8_t xio_operation;
@@ -252,18 +278,21 @@ struct xbdback_fragment {
 };
 
 /*
- * Wrap our pools with a chain of xbdback_instances whose I/O
- * processing has blocked for want of memory from that pool.
+ * Pools to manage the chain of block requests and I/Os fragments
+ * submitted by frontend.
  */
 struct xbdback_pool {
-	struct pool p;
-	SIMPLEQ_HEAD(xbdback_iqueue, xbdback_instance) q;
+	struct pool_cache pc;
 	struct timeval last_warning;
 } xbdback_request_pool, xbdback_io_pool, xbdback_fragment_pool;
+
+SIMPLEQ_HEAD(xbdback_iqueue, xbdback_instance);
 static struct xbdback_iqueue xbdback_shmq;
 static int xbdback_shmcb; /* have we already registered a callback? */
 
-struct timeval xbdback_poolsleep_intvl = { 5, 0 };
+/* Interval between reports of I/O errors from frontend */
+struct timeval xbdback_err_intvl = { 1, 0 };
+
 #ifdef DEBUG
 struct timeval xbdback_fragio_intvl = { 60, 0 };
 #endif
@@ -274,6 +303,9 @@ static void xbdback_frontend_changed(voi
 static void xbdback_backend_changed(struct xenbus_watch *,
     const char **, unsigned int);
 static int  xbdback_evthandler(void *);
+
+static int  xbdback_connect(struct xbdback_instance *);
+static int  xbdback_disconnect(struct xbdback_instance *);
 static void xbdback_finish_disconnect(struct xbdback_instance *);
 
 static struct xbdback_instance *xbdif_lookup(domid_t, uint32_t);
@@ -287,7 +319,6 @@ static void *xbdback_co_main_done2(struc
 static void *xbdback_co_cache_flush(struct xbdback_instance *, void *);
 static void *xbdback_co_cache_flush2(struct xbdback_instance *, void *);
 static void *xbdback_co_cache_doflush(struct xbdback_instance *, void *);
-static void *xbdback_co_cache_doflush_wait(struct xbdback_instance *, void *);
 
 static void *xbdback_co_io(struct xbdback_instance *, void *);
 static void *xbdback_co_io_gotreq(struct xbdback_instance *, void *);
@@ -297,12 +328,13 @@ static void *xbdback_co_io_gotio2(struct
 static void *xbdback_co_io_gotfrag(struct xbdback_instance *, void *);
 static void *xbdback_co_io_gotfrag2(struct xbdback_instance *, void *);
 
-static void *xbdback_co_flush(struct xbdback_instance *, void *);
-static void *xbdback_co_flush_done(struct xbdback_instance *, void *);
+static void *xbdback_co_map_io(struct xbdback_instance *, void *);
+static void *xbdback_co_do_io(struct xbdback_instance *, void *);
+
+static void *xbdback_co_wait_shm_callback(struct xbdback_instance *, void *);
 
 static int  xbdback_shm_callback(void *);
 static void xbdback_io_error(struct xbdback_io *, int);
-static void xbdback_do_io(struct work *, void *);
 static void xbdback_iodone(struct buf *);
 static void xbdback_send_reply(struct xbdback_instance *, uint64_t , int , int);
 
@@ -312,6 +344,8 @@ static void xbdback_unmap_shm(struct xbd
 static void *xbdback_pool_get(struct xbdback_pool *,
 			      struct xbdback_instance *);
 static void xbdback_pool_put(struct xbdback_pool *, void *);
+static void xbdback_thread(void *);
+static void xbdback_wakeup_thread(struct xbdback_instance *);
 static void xbdback_trampoline(struct xbdback_instance *, void *);
 
 static struct xenbus_backend_driver xbd_backend_driver = {
@@ -319,8 +353,6 @@ static struct xenbus_backend_driver xbd_
 	.xbakd_type = "vbd"
 };
 
-struct workqueue *xbdback_workqueue;
-
 void
 xbdbackattach(int n)
 {
@@ -333,27 +365,26 @@ xbdbackattach(int n)
 	SLIST_INIT(&xbdback_instances);
 	SIMPLEQ_INIT(&xbdback_shmq);
 	xbdback_shmcb = 0;
-	pool_init(&xbdback_request_pool.p, sizeof(struct xbdback_request),
-	    0, 0, 0, "xbbrp", NULL, IPL_BIO);
-	SIMPLEQ_INIT(&xbdback_request_pool.q);
-	pool_init(&xbdback_io_pool.p, sizeof(struct xbdback_io),
-	    0, 0, 0, "xbbip", NULL, IPL_BIO);
-	SIMPLEQ_INIT(&xbdback_io_pool.q);
-	pool_init(&xbdback_fragment_pool.p, sizeof(struct xbdback_fragment),
-	    0, 0, 0, "xbbfp", NULL, IPL_BIO);
-	SIMPLEQ_INIT(&xbdback_fragment_pool.q);
+
+	pool_cache_bootstrap(&xbdback_request_pool.pc,
+	    sizeof(struct xbdback_request), 0, 0, 0, "xbbrp", NULL,
+	    IPL_SOFTBIO, NULL, NULL, NULL);
+	pool_cache_bootstrap(&xbdback_io_pool.pc,
+	    sizeof(struct xbdback_io), 0, 0, 0, "xbbip", NULL,
+	    IPL_SOFTBIO, NULL, NULL, NULL);
+	pool_cache_bootstrap(&xbdback_fragment_pool.pc,
+	    sizeof(struct xbdback_fragment), 0, 0, 0, "xbbfp", NULL,
+	    IPL_SOFTBIO, NULL, NULL, NULL);
+
 	/* we allocate enough to handle a whole ring at once */
-	if (pool_prime(&xbdback_request_pool.p, BLKIF_RING_SIZE) != 0)
+	if (pool_prime(&xbdback_request_pool.pc.pc_pool, BLKIF_RING_SIZE) != 0)
 		printf("xbdback: failed to prime request pool\n");
-	if (pool_prime(&xbdback_io_pool.p, BLKIF_RING_SIZE) != 0)
+	if (pool_prime(&xbdback_io_pool.pc.pc_pool, BLKIF_RING_SIZE) != 0)
 		printf("xbdback: failed to prime io pool\n");
-	if (pool_prime(&xbdback_fragment_pool.p,
+	if (pool_prime(&xbdback_fragment_pool.pc.pc_pool,
             BLKIF_MAX_SEGMENTS_PER_REQUEST * BLKIF_RING_SIZE) != 0)
 		printf("xbdback: failed to prime fragment pool\n");
 
-	if (workqueue_create(&xbdback_workqueue, "xbdbackd",
-	    xbdback_do_io, NULL, PRI_BIO, IPL_BIO, 0))
-		printf("xbdback: failed to init workqueue\n");
 	xenbus_backend_register(&xbd_backend_driver);
 }
 
@@ -396,23 +427,28 @@ xbdback_xenbus_create(struct xenbus_devi
 	if (xbdif_lookup(domid, handle) != NULL) {
 		return EEXIST;
 	}
-	xbdi = malloc(sizeof(struct xbdback_instance), M_DEVBUF,
-	    M_NOWAIT | M_ZERO);
-	if (xbdi == NULL) {
-		return ENOMEM;
-	}
+	xbdi = kmem_zalloc(sizeof(*xbdi), KM_SLEEP);
+
 	xbdi->xbdi_domid = domid;
 	xbdi->xbdi_handle = handle;
+	snprintf(xbdi->xbdi_name, sizeof(xbdi->xbdi_name), "xbdb%di%d",
+	    xbdi->xbdi_domid, xbdi->xbdi_handle);
+
+	/* initialize status and reference counter */
 	xbdi->xbdi_status = DISCONNECTED;
-	xbdi->xbdi_refcnt = 1;
+	xbdi_get(xbdi);
+
+	mutex_init(&xbdi->xbdi_lock, MUTEX_DEFAULT, IPL_BIO);
+	cv_init(&xbdi->xbdi_cv, xbdi->xbdi_name);
 	SLIST_INSERT_HEAD(&xbdback_instances, xbdi, next);
+
 	xbusd->xbusd_u.b.b_cookie = xbdi;	
 	xbusd->xbusd_u.b.b_detach = xbdback_xenbus_destroy;
 	xbusd->xbusd_otherend_changed = xbdback_frontend_changed;
 	xbdi->xbdi_xbusd = xbusd;
 
 	error = xenbus_watch_path2(xbusd, xbusd->xbusd_path, "physical-device",
-	    &xbdi->xbdi_watch,  xbdback_backend_changed);
+	    &xbdi->xbdi_watch, xbdback_backend_changed);
 	if (error) {
 		printf("failed to watch on %s/physical-device: %d\n",
 		    xbusd->xbusd_path, error);
@@ -429,7 +465,7 @@ xbdback_xenbus_create(struct xenbus_devi
 fail2:
 	unregister_xenbus_watch(&xbdi->xbdi_watch);
 fail:
-	free(xbdi, M_DEVBUF);
+	kmem_free(xbdi, sizeof(*xbdi));
 	return error;
 }
 
@@ -439,22 +475,12 @@ xbdback_xenbus_destroy(void *arg)
 	struct xbdback_instance *xbdi = arg;
 	struct xenbus_device *xbusd = xbdi->xbdi_xbusd;
 	struct gnttab_unmap_grant_ref ungrop;
-	int err, s;
+	int err;
 
 	XENPRINTF(("xbdback_xenbus_destroy state %d\n", xbdi->xbdi_status));
 
-	if (xbdi->xbdi_status != DISCONNECTED) {
-		hypervisor_mask_event(xbdi->xbdi_evtchn);
-		event_remove_handler(xbdi->xbdi_evtchn, xbdback_evthandler,
-		    xbdi);
-		xbdi->xbdi_status = DISCONNECTING;
-		s = splbio();
-		xbdi_put(xbdi);
-		while (xbdi->xbdi_status != DISCONNECTED) {
-			tsleep(&xbdi->xbdi_status, PRIBIO, "xbddis", 0);
-		}
-		splx(s);
-	}
+	xbdback_disconnect(xbdi);
+
 	/* unregister watch */
 	if (xbdi->xbdi_watch.node) {
 		unregister_xenbus_watch(&xbdi->xbdi_watch);
@@ -487,7 +513,7 @@ xbdback_xenbus_destroy(void *arg)
 		vn_close(xbdi->xbdi_vp, FREAD, NOCRED);
 	}
 	SLIST_REMOVE(&xbdback_instances, xbdi, xbdback_instance, next);
-	free(xbdi, M_DEVBUF);
+	kmem_free(xbdi, sizeof(*xbdi));
 	return 0;
 }
 
@@ -500,7 +526,6 @@ xbdback_connect(struct xbdback_instance 
 	evtchn_op_t evop;
 	u_long ring_ref, revtchn;
 	char *xsproto;
-	char evname[16];
 	const char *proto;
 	struct xenbus_device *xbusd = xbdi->xbdi_xbusd;
 
@@ -602,17 +627,20 @@ xbdback_connect(struct xbdback_instance 
 	}
 	xbdi->xbdi_evtchn = evop.u.bind_interdomain.local_port;
 
-	snprintf(evname, sizeof(evname), "xbdback%di%d",
-	    xbdi->xbdi_domid, xbdi->xbdi_handle);
 	event_set_handler(xbdi->xbdi_evtchn, xbdback_evthandler,
-	    xbdi, IPL_BIO, evname);
+	    xbdi, IPL_BIO, xbdi->xbdi_name);
 	aprint_verbose("xbd backend domain %d handle %#x (%d) "
 	    "using event channel %d, protocol %s\n", xbdi->xbdi_domid,
 	    xbdi->xbdi_handle, xbdi->xbdi_handle, xbdi->xbdi_evtchn, proto);
+
+	/* enable the xbdback event handler machinery */
+	xbdi->xbdi_status = WAITING;
 	hypervisor_enable_event(xbdi->xbdi_evtchn);
 	hypervisor_notify_via_evtchn(xbdi->xbdi_evtchn);
-	xbdi->xbdi_status = CONNECTED;
-	return 0;
+
+	if (kthread_create(IPL_NONE, KTHREAD_MPSAFE, NULL,
+	    xbdback_thread, xbdi, NULL, xbdi->xbdi_name) == 0)
+		return 0;
 
 err2:
 	/* unmap ring */
@@ -631,21 +659,27 @@ err:
 	return -1;
 }
 
+/*
+ * Signal a xbdback thread to disconnect. Done in 'xenwatch' thread context.
+ */
 static int
 xbdback_disconnect(struct xbdback_instance *xbdi)
 {
-	int s;
 	
 	hypervisor_mask_event(xbdi->xbdi_evtchn);
 	event_remove_handler(xbdi->xbdi_evtchn, xbdback_evthandler,
 	    xbdi);
+
+	/* signal thread that we want to disconnect, then wait for it */
+	mutex_enter(&xbdi->xbdi_lock);
 	xbdi->xbdi_status = DISCONNECTING;
-	s = splbio();
-	xbdi_put(xbdi);
-	while (xbdi->xbdi_status != DISCONNECTED) {
-		tsleep(&xbdi->xbdi_status, PRIBIO, "xbddis", 0);
-	}
-	splx(s);
+	cv_signal(&xbdi->xbdi_cv);
+
+	while (xbdi->xbdi_status != DISCONNECTED)
+		cv_wait(&xbdi->xbdi_cv, &xbdi->xbdi_lock);
+
+	mutex_exit(&xbdi->xbdi_lock);
+
 	xenbus_switch_state(xbdi->xbdi_xbusd, NULL, XenbusStateClosing);
 
 	return 0;
@@ -663,7 +697,7 @@ xbdback_frontend_changed(void *arg, Xenb
 		break;
 	case XenbusStateInitialised:
 	case XenbusStateConnected:
-		if (xbdi->xbdi_status == CONNECTED)
+		if (xbdi->xbdi_status == WAITING || xbdi->xbdi_status == RUN)
 			break;
 		xbdback_connect(xbdi);
 		break;
@@ -704,14 +738,16 @@ xbdback_backend_changed(struct xenbus_wa
 	if (err)
 		return;
 	/*
-	 * we can also fire up after having openned the device, don't try
+	 * we can also fire up after having opened the device, don't try
 	 * to do it twice.
 	 */
 	if (xbdi->xbdi_vp != NULL) {
-		if (xbdi->xbdi_status == CONNECTED && xbdi->xbdi_dev != dev) {
-			printf("xbdback %s: changing physical device from "
-			    "0x%" PRIx64 " to 0x%lx not supported\n",
-			    xbusd->xbusd_path, xbdi->xbdi_dev, dev);
+		if (xbdi->xbdi_status == WAITING || xbdi->xbdi_status == RUN) {
+			if (xbdi->xbdi_dev != dev) {
+				printf("xbdback %s: changing physical device "
+				    "from %#"PRIx64" to %#lx not supported\n",
+				    xbusd->xbusd_path, xbdi->xbdi_dev, dev);
+			}
 		}
 		return;
 	}
@@ -723,9 +759,9 @@ xbdback_backend_changed(struct xenbus_wa
 		return;
 	}
 	if (mode[0] == 'w')
-		xbdi->xbdi_ro = 0;
+		xbdi->xbdi_ro = false;
 	else
-		xbdi->xbdi_ro = 1;
+		xbdi->xbdi_ro = true;
 	major = major(xbdi->xbdi_dev);
 	devname = devsw_blk2name(major);
 	if (devname == NULL) {
@@ -830,14 +866,18 @@ abort:
 	xenbus_transaction_end(xbt, 1);
 }
 
-
-static void xbdback_finish_disconnect(struct xbdback_instance *xbdi)
+/*
+ * Used by a xbdi thread to signal that it is now disconnected.
+ */
+static void
+xbdback_finish_disconnect(struct xbdback_instance *xbdi)
 {
+	KASSERT(mutex_owned(&xbdi->xbdi_lock));
 	KASSERT(xbdi->xbdi_status == DISCONNECTING);
 
 	xbdi->xbdi_status = DISCONNECTED;
-	wakeup(&xbdi->xbdi_status);
 
+	cv_signal(&xbdi->xbdi_cv);
 }
 
 static struct xbdback_instance *
@@ -860,18 +900,64 @@ xbdback_evthandler(void *arg)
 	XENPRINTF(("xbdback_evthandler domain %d: cont %p\n",
 	    xbdi->xbdi_domid, xbdi->xbdi_cont));
 
-	if (xbdi->xbdi_cont == NULL) {
-		xbdi->xbdi_cont = xbdback_co_main;
-		xbdback_trampoline(xbdi, xbdi);
-	}
+	xbdback_wakeup_thread(xbdi);
 
 	return 1;
 }
 
+/*
+ * Main thread routine for one xbdback instance. Woken up by
+ * xbdback_evthandler when a domain has I/O work scheduled in a I/O ring.
+ */
+static void
+xbdback_thread(void *arg)
+{
+	struct xbdback_instance *xbdi = arg;
+
+	for (;;) {
+		mutex_enter(&xbdi->xbdi_lock);
+		switch (xbdi->xbdi_status) {
+		case WAITING:
+			cv_wait(&xbdi->xbdi_cv, &xbdi->xbdi_lock);
+			mutex_exit(&xbdi->xbdi_lock);
+			break;
+		case RUN:
+			xbdi->xbdi_status = WAITING; /* reset state */
+			mutex_exit(&xbdi->xbdi_lock);
+
+			if (xbdi->xbdi_cont == NULL) {
+				xbdi->xbdi_cont = xbdback_co_main;
+			}
+
+			xbdback_trampoline(xbdi, xbdi);
+			break;
+		case DISCONNECTING:
+			if (xbdi->xbdi_pendingreqs > 0) {
+				/* there are pending I/Os. Wait for them. */
+				cv_wait(&xbdi->xbdi_cv, &xbdi->xbdi_lock);
+				mutex_exit(&xbdi->xbdi_lock);
+				break;
+			}
+			
+			/* All I/Os should have been processed by now,
+			 * xbdi_refcnt should drop to 0 */
+			xbdi_put(xbdi);
+			KASSERT(xbdi->xbdi_refcnt == 0);
+			mutex_exit(&xbdi->xbdi_lock);
+			kthread_exit(0);
+			break;
+		default:
+			panic("%s: invalid state %d",
+			    xbdi->xbdi_name, xbdi->xbdi_status);
+		}
+	}
+}
+
 static void *
 xbdback_co_main(struct xbdback_instance *xbdi, void *obj)
 {
 	(void)obj;
+
 	xbdi->xbdi_req_prod = xbdi->xbdi_ring.ring_n.sring->req_prod;
 	xen_rmb(); /* ensure we see all requests up to req_prod */
 	/*
@@ -885,15 +971,18 @@ xbdback_co_main(struct xbdback_instance 
 /*
  * Fetch a blkif request from the ring, and pass control to the appropriate
  * continuation.
+ * If someone asked for disconnection, do not fetch any more request from
+ * the ring.
  */
 static void *
 xbdback_co_main_loop(struct xbdback_instance *xbdi, void *obj) 
 {
-	blkif_request_t *req = &xbdi->xbdi_xen_req;
+	blkif_request_t *req;
 	blkif_x86_32_request_t *req32;
 	blkif_x86_64_request_t *req64;
 
 	(void)obj;
+	req = &xbdi->xbdi_xen_req;
 	if (xbdi->xbdi_ring.ring_n.req_cons != xbdi->xbdi_req_prod) {
 		switch(xbdi->xbdi_proto) {
 		case XBDIP_NATIVE:
@@ -937,8 +1026,11 @@ xbdback_co_main_loop(struct xbdback_inst
 			xbdi->xbdi_cont = xbdback_co_cache_flush;
 			break;
 		default:
-			printf("xbdback_evthandler domain %d: unknown "
-			    "operation %d\n", xbdi->xbdi_domid, req->operation);
+			if (ratecheck(&xbdi->xbdi_lasterr_time,
+			    &xbdback_err_intvl)) {
+				printf("%s: unknown operation %d\n",
+				    xbdi->xbdi_name, req->operation);
+			}
 			xbdback_send_reply(xbdi, req->id, req->operation,
 			    BLKIF_RSP_ERROR);
 			xbdi->xbdi_cont = xbdback_co_main_incr;
@@ -951,8 +1043,8 @@ xbdback_co_main_loop(struct xbdback_inst
 }
 
 /*
- * Increment consumer index and move on to the next request. In case index
- * leads to ring overflow, bail out.
+ * Increment consumer index and move on to the next request. In case
+ * we want to disconnect, leave continuation now.
  */
 static void *
 xbdback_co_main_incr(struct xbdback_instance *xbdi, void *obj)
@@ -961,11 +1053,24 @@ xbdback_co_main_incr(struct xbdback_inst
 	blkif_back_ring_t *ring = &xbdi->xbdi_ring.ring_n;
 
 	ring->req_cons++;
-	if (RING_REQUEST_CONS_OVERFLOW(ring, ring->req_cons))
+
+	/*
+	 * Do not bother with locking here when checking for xbdi_status: if
+	 * we get a transient state, we will get the right value at
+	 * the next increment.
+	 */
+	if (xbdi->xbdi_status == DISCONNECTING)
 		xbdi->xbdi_cont = NULL;
 	else
 		xbdi->xbdi_cont = xbdback_co_main_loop;
 
+	/*
+	 * Each time the thread processes a full ring of requests, give
+	 * a chance to other threads to process I/Os too
+	 */
+	if ((ring->req_cons % BLKIF_RING_SIZE) == 0)
+		yield();
+
 	return xbdi;
 }
 
@@ -980,7 +1085,7 @@ xbdback_co_main_done(struct xbdback_inst
 	if (xbdi->xbdi_io != NULL) {
 		KASSERT(xbdi->xbdi_io->xio_operation == BLKIF_OP_READ ||
 		    xbdi->xbdi_io->xio_operation == BLKIF_OP_WRITE);
-		xbdi->xbdi_cont = xbdback_co_flush;
+		xbdi->xbdi_cont = xbdback_co_map_io;
 		xbdi->xbdi_cont_aux = xbdback_co_main_done2;
 	} else {
 		xbdi->xbdi_cont = xbdback_co_main_done2;
@@ -1002,6 +1107,7 @@ xbdback_co_main_done2(struct xbdback_ins
 		xbdi->xbdi_cont = xbdback_co_main;
 	else
 		xbdi->xbdi_cont = NULL;
+
 	return xbdi;
 }
 
@@ -1012,14 +1118,14 @@ static void *
 xbdback_co_cache_flush(struct xbdback_instance *xbdi, void *obj)
 {
 	(void)obj;
-	KASSERT(curcpu()->ci_ilevel >= IPL_BIO);
+
 	XENPRINTF(("xbdback_co_cache_flush %p %p\n", xbdi, obj));
 	if (xbdi->xbdi_io != NULL) {
 		/* Some I/Os are required for this instance. Process them. */
 		KASSERT(xbdi->xbdi_io->xio_operation == BLKIF_OP_READ ||
 		    xbdi->xbdi_io->xio_operation == BLKIF_OP_WRITE);
 		KASSERT(xbdi->xbdi_pendingreqs > 0);
-		xbdi->xbdi_cont = xbdback_co_flush;
+		xbdi->xbdi_cont = xbdback_co_map_io;
 		xbdi->xbdi_cont_aux = xbdback_co_cache_flush2;
 	} else {
 		xbdi->xbdi_cont = xbdback_co_cache_flush2;
@@ -1045,7 +1151,7 @@ xbdback_co_cache_flush2(struct xbdback_i
 	return xbdback_pool_get(&xbdback_io_pool, xbdi);
 }
 
-/* Enqueue the flush work */
+/* Start the flush work */
 static void *
 xbdback_co_cache_doflush(struct xbdback_instance *xbdi, void *obj)
 {
@@ -1056,25 +1162,8 @@ xbdback_co_cache_doflush(struct xbdback_
 	xbd_io->xio_xbdi = xbdi;
 	xbd_io->xio_operation = xbdi->xbdi_xen_req.operation;
 	xbd_io->xio_flush_id = xbdi->xbdi_xen_req.id;
-	workqueue_enqueue(xbdback_workqueue, &xbdi->xbdi_io->xio_work, NULL);
-	/*
-	 * xbdback_do_io() will advance req pointer and restart processing.
-	 * Note that we could probably set xbdi->xbdi_io to NULL and
-	 * let the processing continue, but we really want to wait
-	 * for the flush to complete before doing any more work.
-	 */
-	xbdi->xbdi_cont = xbdback_co_cache_doflush_wait;
-	return NULL;
-}
-
-/* wait for the flush work to complete */
-static void *
-xbdback_co_cache_doflush_wait(struct xbdback_instance *xbdi, void *obj)
-{
-	(void)obj;
-	/* abort the continuation loop; xbdback_do_io() will restart it */
-	xbdi->xbdi_cont = xbdback_co_cache_doflush_wait;
-	return NULL;
+	xbdi->xbdi_cont = xbdback_co_do_io;
+	return xbdi;
 }
 
 /*
@@ -1095,8 +1184,12 @@ xbdback_co_io(struct xbdback_instance *x
 	req = &xbdi->xbdi_xen_req;
 	if (req->nr_segments < 1 ||
 	    req->nr_segments > BLKIF_MAX_SEGMENTS_PER_REQUEST) {
-		printf("xbdback_io domain %d: %d segments\n",
-		       xbdi->xbdi_domid, xbdi->xbdi_xen_req.nr_segments);
+		if (ratecheck(&xbdi->xbdi_lasterr_time,
+		    &xbdback_err_intvl)) {
+			printf("%s: invalid number of segments: %d\n",
+			       xbdi->xbdi_name,
+			       xbdi->xbdi_xen_req.nr_segments);
+		}
 		error = EINVAL;
 		goto end;
 	}
@@ -1181,10 +1274,10 @@ xbdback_co_io_gotreq(struct xbdback_inst
 			    xbdi->xbdi_domid));
 			xbdi->xbdi_next_sector =
 			    xbdi->xbdi_xen_req.sector_number;
-			xbdi->xbdi_cont_aux = xbdi->xbdi_cont; 
 			KASSERT(xbdi->xbdi_io->xio_operation == BLKIF_OP_READ ||
 			    xbdi->xbdi_io->xio_operation == BLKIF_OP_WRITE);
-			xbdi->xbdi_cont = xbdback_co_flush;
+			xbdi->xbdi_cont_aux = xbdback_co_io_loop;
+			xbdi->xbdi_cont = xbdback_co_map_io;
 		}
 	} else {
 		xbdi->xbdi_next_sector = xbdi->xbdi_xen_req.sector_number;
@@ -1196,8 +1289,6 @@ xbdback_co_io_gotreq(struct xbdback_inst
 static void *
 xbdback_co_io_loop(struct xbdback_instance *xbdi, void *obj)
 {
-	struct xbdback_io *xio;
-
 	(void)obj;
 	KASSERT(xbdi->xbdi_req->rq_operation == BLKIF_OP_READ ||
 	    xbdi->xbdi_req->rq_operation == BLKIF_OP_WRITE);
@@ -1239,25 +1330,26 @@ xbdback_co_io_loop(struct xbdback_instan
 #endif
 			    ) {
 #ifdef DEBUG
-				static struct timeval gluetimer;
-				if (ratecheck(&gluetimer,
-					      &xbdback_fragio_intvl))
-					printf("xbdback: domain %d sending"
+				if (ratecheck(&xbdi->xbdi_lastfragio_time,
+				    &xbdback_fragio_intvl))
+					printf("%s: domain is sending"
 					    " excessively fragmented I/O\n",
-					    xbdi->xbdi_domid);
+					    xbdi->xbdi_name);
 #endif
-				printf("xbdback_io: would maybe glue same page sec %d (%d->%d)\n", xbdi->xbdi_segno, this_fs, this_ls);
-				panic("notyet!");
+				printf("xbdback_io: would maybe glue "
+				    "same page sec %d (%d->%d)\n",
+				    xbdi->xbdi_segno, this_fs, this_ls);
 				XENPRINTF(("xbdback_io domain %d: glue same "
 				    "page", xbdi->xbdi_domid));
+				panic("notyet!");
 				xbdi->xbdi_same_page = 1;
 			} else {
-				xbdi->xbdi_cont_aux = xbdback_co_io_loop;
 				KASSERT(xbdi->xbdi_io->xio_operation ==
 				     BLKIF_OP_READ ||
 				    xbdi->xbdi_io->xio_operation ==
 				     BLKIF_OP_WRITE);
-				xbdi->xbdi_cont = xbdback_co_flush;
+				xbdi->xbdi_cont_aux = xbdback_co_io_loop;
+				xbdi->xbdi_cont = xbdback_co_map_io;
 				return xbdi;
 			}
 		} else
@@ -1265,8 +1357,7 @@ xbdback_co_io_loop(struct xbdback_instan
 
 		if (xbdi->xbdi_io == NULL) {
 			xbdi->xbdi_cont = xbdback_co_io_gotio;
-			xio = xbdback_pool_get(&xbdback_io_pool, xbdi);
-			return xio;
+			return xbdback_pool_get(&xbdback_io_pool, xbdi);
 		} else {
 			xbdi->xbdi_cont = xbdback_co_io_gotio2;
 		}
@@ -1274,7 +1365,7 @@ xbdback_co_io_loop(struct xbdback_instan
 		/* done with the loop over segments; get next request */
 		xbdi->xbdi_cont = xbdback_co_main_incr;
 	}
-	return xbdi;			
+	return xbdi;
 }
 
 /* Prepare an I/O buffer for a xbdback instance */
@@ -1285,7 +1376,6 @@ xbdback_co_io_gotio(struct xbdback_insta
 	vaddr_t start_offset; /* start offset in vm area */
 	int buf_flags;
 
-	KASSERT(curcpu()->ci_ilevel >= IPL_BIO);
 	xbdi_get(xbdi);
 	atomic_inc_uint(&xbdi->xbdi_pendingreqs);
 	
@@ -1392,31 +1482,19 @@ xbdback_co_io_gotfrag2(struct xbdback_in
 }
 
 /*
- * Map the different I/O requests in backend's VA space, then schedule
- * the I/O work.
+ * Map the different I/O requests in backend's VA space.
  */
 static void *
-xbdback_co_flush(struct xbdback_instance *xbdi, void *obj)
+xbdback_co_map_io(struct xbdback_instance *xbdi, void *obj)
 {
 	(void)obj;
 	XENPRINTF(("xbdback_io domain %d: flush sect %ld size %d ptr 0x%lx\n",
 	    xbdi->xbdi_domid, (long)xbdi->xbdi_io->xio_buf.b_blkno,
 	    (int)xbdi->xbdi_io->xio_buf.b_bcount, (long)xbdi->xbdi_io));
-	xbdi->xbdi_cont = xbdback_co_flush_done;
+	xbdi->xbdi_cont = xbdback_co_do_io;
 	return xbdback_map_shm(xbdi->xbdi_io);
 }
 
-/* Transfer all I/O work to the workqueue */
-static void *
-xbdback_co_flush_done(struct xbdback_instance *xbdi, void *obj)
-{
-	(void)obj;
-	workqueue_enqueue(xbdback_workqueue, &xbdi->xbdi_io->xio_work, NULL);
-	xbdi->xbdi_io = NULL;
-	xbdi->xbdi_cont = xbdi->xbdi_cont_aux;
-	return xbdi;
-}
-
 static void
 xbdback_io_error(struct xbdback_io *xbd_io, int error)
 {
@@ -1425,22 +1503,19 @@ xbdback_io_error(struct xbdback_io *xbd_
 }
 
 /*
- * Main xbdback workqueue routine: performs I/O on behalf of backend. Has
- * thread context.
+ * Main xbdback I/O routine. It can either perform a flush operation or
+ * schedule a read/write operation.
  */
-static void
-xbdback_do_io(struct work *wk, void *dummy)
+static void *
+xbdback_co_do_io(struct xbdback_instance *xbdi, void *obj)
 {
-	struct xbdback_io *xbd_io = (void *)wk;
-	int s;
-	KASSERT(&xbd_io->xio_work == wk);
+	struct xbdback_io *xbd_io = xbdi->xbdi_io;
 
 	switch (xbd_io->xio_operation) {
 	case BLKIF_OP_FLUSH_DISKCACHE:
 	{
 		int error;
 		int force = 1;
-		struct xbdback_instance *xbdi = xbd_io->xio_xbdi;
 
 		error = VOP_IOCTL(xbdi->xbdi_vp, DIOCCACHESYNC, &force, FWRITE,
 		    kauth_cred_get());
@@ -1457,13 +1532,9 @@ xbdback_do_io(struct work *wk, void *dum
 		    xbd_io->xio_operation, error);
 		xbdback_pool_put(&xbdback_io_pool, xbd_io);
 		xbdi_put(xbdi);
-		/* handle next IO */
-		s = splbio();
 		xbdi->xbdi_io = NULL;
 		xbdi->xbdi_cont = xbdback_co_main_incr;
-		xbdback_trampoline(xbdi, xbdi);
-		splx(s);
-		break;
+		return xbdi;
 	}
 	case BLKIF_OP_READ:
 	case BLKIF_OP_WRITE:
@@ -1476,16 +1547,16 @@ xbdback_do_io(struct work *wk, void *dum
 		    ((((bdata + xbd_io->xio_buf.b_bcount - 1) & ~PAGE_MASK) -
 		    (bdata & ~PAGE_MASK)) >> PAGE_SHIFT) + 1;
 		if ((bdata & ~PAGE_MASK) != (xbd_io->xio_vaddr & ~PAGE_MASK)) {
-			printf("xbdback_do_io: vaddr %#" PRIxVADDR
+			printf("xbdback_co_do_io: vaddr %#" PRIxVADDR
 			    " bdata %#" PRIxVADDR "\n",
 			    xbd_io->xio_vaddr, bdata);
-			panic("xbdback_do_io: bdata page change");
+			panic("xbdback_co_do_io: bdata page change");
 		}
 		if (nsegs > xbd_io->xio_nrma) {
-			printf("xbdback_do_io: vaddr %#" PRIxVADDR
+			printf("xbdback_co_do_io: vaddr %#" PRIxVADDR
 			    " bcount %#x doesn't fit in %d pages\n",
 			    bdata, xbd_io->xio_buf.b_bcount, xbd_io->xio_nrma);
-			panic("xbdback_do_io: not enough pages");
+			panic("xbdback_co_do_io: not enough pages");
 		}
 		}
 #endif
@@ -1495,22 +1566,29 @@ xbdback_do_io(struct work *wk, void *dum
 			mutex_exit(xbd_io->xio_buf.b_vp->v_interlock);
 		}
 		bdev_strategy(&xbd_io->xio_buf);
-		break;
+		/* will call xbdback_iodone() asynchronously when done */
+		xbdi->xbdi_io = NULL;
+		xbdi->xbdi_cont = xbdi->xbdi_cont_aux;
+		return xbdi;
 	default:
 		/* Should never happen */
-		panic("xbdback_do_io: unsupported operation %d",
+		panic("xbdback_co_do_io: unsupported operation %d",
 		    xbd_io->xio_operation);
 	}
 }
 
-/* This gets reused by xbdback_io_error to report errors from other sources. */
+/*
+ * Called from softint(9) context when an I/O is done: for each request, send
+ * back the associated reply to the domain.
+ *
+ * This gets reused by xbdback_io_error to report errors from other sources.
+ */
 static void
 xbdback_iodone(struct buf *bp)
 {
 	struct xbdback_io *xbd_io;
 	struct xbdback_instance *xbdi;
 	int errp;
-	int s;
 
 	xbd_io = bp->b_private;
 	xbdi = xbd_io->xio_xbdi;
@@ -1554,7 +1632,8 @@ xbdback_iodone(struct buf *bp)
 		    ? BLKIF_RSP_ERROR
 		    : BLKIF_RSP_OKAY;
 
-		XENPRINTF(("xbdback_io domain %d: end request %" PRIu64 " error=%d\n",
+		XENPRINTF(("xbdback_io domain %d: end request %"PRIu64
+		    "error=%d\n",
 		    xbdi->xbdi_domid, xbd_req->rq_id, error));
 		xbdback_send_reply(xbdi, xbd_req->rq_id,
 		    xbd_req->rq_operation, error);
@@ -1564,18 +1643,29 @@ xbdback_iodone(struct buf *bp)
 	atomic_dec_uint(&xbdi->xbdi_pendingreqs);
 	buf_destroy(&xbd_io->xio_buf);
 	xbdback_pool_put(&xbdback_io_pool, xbd_io);
-	s = splbio();
-	if (xbdi->xbdi_cont == NULL) {
-		/* check if there is more work to do */
-		xbdi->xbdi_cont = xbdback_co_main;
-		xbdback_trampoline(xbdi, xbdi);
-	}
-	splx(s);
+
+	xbdback_wakeup_thread(xbdi);
+}
+
+/*
+ * Wake up the per xbdback instance thread.
+ */
+static void
+xbdback_wakeup_thread(struct xbdback_instance *xbdi)
+{
+
+	mutex_enter(&xbdi->xbdi_lock);
+	/* only set RUN state when we are WAITING for work */
+	if (xbdi->xbdi_status == WAITING)
+	       xbdi->xbdi_status = RUN;
+	mutex_exit(&xbdi->xbdi_lock);
+
+	cv_broadcast(&xbdi->xbdi_cv);
 }
 
 /*
  * called once a request has completed. Place the reply in the ring and
- * notify the guest OS
+ * notify the guest OS.
  */
 static void
 xbdback_send_reply(struct xbdback_instance *xbdi, uint64_t id,
@@ -1586,7 +1676,13 @@ xbdback_send_reply(struct xbdback_instan
 	blkif_x86_64_response_t *resp64;
 	int notify;
 
-	switch(xbdi->xbdi_proto) {
+	/*
+	 * The ring can be accessed by the xbdback thread, xbdback_iodone()
+	 * handler, or any handler that triggered the shm callback. So
+	 * protect ring access via the xbdi_lock mutex.
+	 */
+	mutex_enter(&xbdi->xbdi_lock);
+	switch (xbdi->xbdi_proto) {
 	case XBDIP_NATIVE:
 		resp_n = RING_GET_RESPONSE(&xbdi->xbdi_ring.ring_n,
 		    xbdi->xbdi_ring.ring_n.rsp_prod_pvt);
@@ -1611,6 +1707,8 @@ xbdback_send_reply(struct xbdback_instan
 	}
 	xbdi->xbdi_ring.ring_n.rsp_prod_pvt++;
 	RING_PUSH_RESPONSES_AND_CHECK_NOTIFY(&xbdi->xbdi_ring.ring_n, notify);
+	mutex_exit(&xbdi->xbdi_lock);
+
 	if (notify) {
 		XENPRINTF(("xbdback_send_reply notify %d\n", xbdi->xbdi_domid));
 		hypervisor_notify_via_evtchn(xbdi->xbdi_evtchn);
@@ -1640,9 +1738,10 @@ xbdback_map_shm(struct xbdback_io *xbd_i
 
 	xbdi = xbd_io->xio_xbdi;
 	xbd_rq = SLIST_FIRST(&xbd_io->xio_rq)->car;
+
 	error = xen_shm_map(xbd_io->xio_nrma, xbdi->xbdi_domid,
 	    xbd_io->xio_gref, &xbd_io->xio_vaddr, xbd_io->xio_gh, 
-	    (xbd_rq->rq_operation == BLKIF_OP_WRITE) ? XSHM_RO: 0);
+	    (xbd_rq->rq_operation == BLKIF_OP_WRITE) ? XSHM_RO : 0);
 
 	switch(error) {
 	case 0:
@@ -1654,7 +1753,7 @@ xbdback_map_shm(struct xbdback_io *xbd_i
 		printf("\n");
 #endif
 		xbd_io->xio_mapped = 1;
-		return (void *)xbd_io->xio_vaddr;
+		return xbdi;
 	case ENOMEM:
 		s = splvm();
 		if (!xbdback_shmcb) {
@@ -1668,10 +1767,11 @@ xbdback_map_shm(struct xbdback_io *xbd_i
 		}
 		SIMPLEQ_INSERT_TAIL(&xbdback_shmq, xbdi, xbdi_on_hold);
 		splx(s);
+		/* Put the thread to sleep until the callback is called */
+		xbdi->xbdi_cont = xbdback_co_wait_shm_callback;
 		return NULL;
 	default:
-		printf("xbdback_map_shm: xen_shm error %d ",
-		       error);
+		printf("xbdback_map_shm: xen_shm error %d ", error);
 		xbdback_io_error(xbdi->xbdi_io, error);
 		xbdi->xbdi_io = NULL;
 		xbdi->xbdi_cont = xbdi->xbdi_cont_aux;
@@ -1684,6 +1784,11 @@ xbdback_shm_callback(void *arg)
 {
         int error, s;
 
+	/*
+	 * The shm callback may be executed at any level, including
+	 * IPL_BIO and IPL_NET levels. Raise to the lowest priority level
+	 * that can mask both.
+	 */
 	s = splvm();
 	while(!SIMPLEQ_EMPTY(&xbdback_shmq)) {
 		struct xbdback_instance *xbdi;
@@ -1705,19 +1810,17 @@ xbdback_shm_callback(void *arg)
 			splx(s);
 			return -1; /* will try again later */
 		case 0:
-			xbd_io->xio_mapped = 1;
 			SIMPLEQ_REMOVE_HEAD(&xbdback_shmq, xbdi_on_hold);
-			(void)splbio();
-			xbdback_trampoline(xbdi, xbdi);
+			xbd_io->xio_mapped = 1;
+			xbdback_wakeup_thread(xbdi);
 			break;
 		default:
 			SIMPLEQ_REMOVE_HEAD(&xbdback_shmq, xbdi_on_hold);
-			(void)splbio();
 			printf("xbdback_shm_callback: xen_shm error %d\n",
 			       error);
-			xbdi->xbdi_cont = xbdi->xbdi_cont_aux;
 			xbdback_io_error(xbd_io, error);
-			xbdback_trampoline(xbdi, xbdi);
+			xbdi->xbdi_io = NULL;
+			xbdback_wakeup_thread(xbdi);
 			break;
 		}
 	}
@@ -1726,6 +1829,26 @@ xbdback_shm_callback(void *arg)
 	return 0;
 }
 
+/*
+ * Allows waiting for the shm callback to complete.
+ */
+static void *
+xbdback_co_wait_shm_callback(struct xbdback_instance *xbdi, void *obj)
+{
+
+	if (xbdi->xbdi_io == NULL || xbdi->xbdi_io->xio_mapped == 1) {
+		/*
+		 * Only proceed to next step when the callback reported
+		 * success or failure.
+		 */
+		xbdi->xbdi_cont = xbdi->xbdi_cont_aux;
+		return xbdi;
+	} else {
+		/* go back to sleep */
+		return NULL;
+	}
+}
+
 /* unmap a request from our virtual address space (request is done) */
 static void
 xbdback_unmap_shm(struct xbdback_io *xbd_io)
@@ -1746,44 +1869,19 @@ xbdback_unmap_shm(struct xbdback_io *xbd
 	xbd_io->xio_vaddr = -1;
 }
 
-/* Obtain memory from a pool, in cooperation with the continuations. */
-static void *xbdback_pool_get(struct xbdback_pool *pp,
+/* Obtain memory from a pool */
+static void *
+xbdback_pool_get(struct xbdback_pool *pp,
 			      struct xbdback_instance *xbdi)
 {
-	int s;
-	void *item;
-
-	item = pool_get(&pp->p, PR_NOWAIT);
-	if (item == NULL) {
-		if (ratecheck(&pp->last_warning, &xbdback_poolsleep_intvl))
-			printf("xbdback_pool_get: %s is full",
-			       pp->p.pr_wchan);
-		s = splvm();
-		SIMPLEQ_INSERT_TAIL(&pp->q, xbdi, xbdi_on_hold);
-		splx(s);
-	}
-	return item;
+	return pool_cache_get(&pp->pc, PR_WAITOK);
 }
 
-/*
- * Restore memory to a pool... unless an xbdback instance had been
- * waiting for it, in which case that gets the memory first.
- */
-static void xbdback_pool_put(struct xbdback_pool *pp, void *item)
+/* Restore memory to a pool */
+static void
+xbdback_pool_put(struct xbdback_pool *pp, void *item)
 {
-	int s;
-	
-	s = splvm();
-	if (SIMPLEQ_EMPTY(&pp->q)) {
-		splx(s);
-		pool_put(&pp->p, item);
-	} else {
-		struct xbdback_instance *xbdi = SIMPLEQ_FIRST(&pp->q);
-		SIMPLEQ_REMOVE_HEAD(&pp->q, xbdi_on_hold);
-		(void)splbio();
-		xbdback_trampoline(xbdi, item);
-		splx(s);
-	}
+	pool_cache_put(&pp->pc, item);
 }
 
 /*
@@ -1794,7 +1892,6 @@ static void
 xbdback_trampoline(struct xbdback_instance *xbdi, void *obj)
 {
 	xbdback_cont_t cont;
-	KASSERT(curcpu()->ci_ilevel >= IPL_BIO);
 
 	while(obj != NULL && xbdi->xbdi_cont != NULL) {
 		cont = xbdi->xbdi_cont;

Reply via email to