Re: [Qemu-devel] [PATCH v9 1/4] block: add the block queue support

2011-11-01 Thread Zhi Yong Wu
On Mon, Oct 31, 2011 at 9:35 PM, Stefan Hajnoczi stefa...@gmail.com wrote:
 On Fri, Oct 28, 2011 at 11:02 AM, Zhi Yong Wu wu...@linux.vnet.ibm.com 
 wrote:
 +static void bdrv_io_limits_skip_set(void *opaque,
 +                                    BlockAPIType co_type,
 +                                    bool cb_skip,
 +                                    bool limit_skip) {
 +    RwCo *rwco;
 +    BlockDriverAIOCBCoroutine *aioco;
 +
 +    if (co_type == BDRV_API_SYNC) {
 +        rwco = opaque;
 +        rwco-limit_skip = limit_skip;
 +    } else if (co_type == BDRV_API_ASYNC) {
 +        aioco = opaque;
 +        aioco-cb_skip = cb_skip;
 +        aioco-limit_skip = limit_skip;
 +    } else {
 +        abort();
 +    }
 +}

I have sent out v10. It discard the queue and request defined by us,
and rebase it to CoQueue, and let Coroutine represent one I/O request.
The code logic is now much simpler.

 The main question I have about this series is why have different cases
 for sync, aio, and coroutines?  Perhaps I'm missing something but this
 should all be much simpler.

 All read/write requests are processed in a coroutine
 (bdrv_co_do_readv()/bdrv_co_do_writev()).  That is the place to
 perform I/O throttling.  Throttling should not be aware of sync, aio,
 vs coroutines.

 Since all requests have coroutines you could use CoQueue and the
 actual queue waiting code in bdrv_co_do_readv()/bdrv_co_do_writev()
 becomes:

 if (bdrv_exceeds_io_limit(bs, sector_num, qiov, is_write)) {
    qemu_co_queue_wait(bs-throttled_reqs);

    /* Wait until this request is allowed to start */
    while (bdrv_exceeds_io_limit(bs, sector_num, qiov, is_write)) {
        /* Re-inserting at the head of the CoQueue is equivalent to
         * the queue-flushing/queue-limit_exceeded behavior in your
         * patch.
         */
        qemu_co_queue_wait_insert_head(bs-throttled_reqs);
    }
 }

 I think block/blk-queue.h isn't needed if you use the existing CoQueue
 structure.

 This is the main point I want to raise, just a few minor comments
 below which may not be relevant if you can drop BlockQueue.

 +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
 +{
 +    int ret;
 +
 +    BlockDriverState *bs = request-common.bs;
 +    assert(bs);
 +
 +    if (bs-io_limits_enabled) {

 I'm not sure why the BlockQueue needs to reach into BlockDriverState.
 Now BlockDriverState and BlockQueue know about and depend on each
 other.  It's usually nicer to keep the relationship unidirectional, if
 possible.

 +        ret = request-handler(request-common.bs, request-sector_num,
 +                               request-nb_sectors, request-qiov,
 +                               request, request-co_type);
 +    } else {
 +        if (request-co_type == BDRV_API_CO) {
 +            qemu_coroutine_enter(request-co, request-cocb);
 +        } else {
 +            printf(%s, req: %p\n, __func__, (void *)request);

 Debug output should be removed.

 +            bdrv_io_limits_issue_request(request, request-co_type);
 +        }
 +
 +        ret = BDRV_BLKQ_DEQ_PASS;
 +    }
 +
 +    return ret;
 +}
 +
 +void qemu_block_queue_submit(BlockQueue *queue, BlockDriverCompletionFunc 
 *cb)
 +{
 +    BlockQueueAIOCB *request;
 +    queue-flushing = true;
 +
 +    /*QTAILQ_FOREACH_SAFE(request, queue-requests, entry, next) {*/

 Commented out code should be removed.

 +    while (!QTAILQ_EMPTY(queue-requests)) {
 +        int ret = 0;
 +
 +        request = QTAILQ_FIRST(queue-requests);
 +        QTAILQ_REMOVE(queue-requests, request, entry);
 +        queue-limit_exceeded = false;
 +        ret = qemu_block_queue_handler(request);
 +        if (ret == -EIO) {
 +            cb(request, -EIO);
 +            break;
 +        } else if (ret == BDRV_BLKQ_ENQ_AGAIN) {
 +            QTAILQ_INSERT_HEAD(queue-requests, request, entry);
 +            break;
 +        } else if (ret == BDRV_BLKQ_DEQ_PASS) {
 +            cb(request, 0);
 +        }

 What if ret is not -EIO, BDRV_BLKQ_ENQ_AGAIN, or BDRV_BLKQ_DEQ_PASS?
 I think the -EIO case should be the default that calls cb(request,
 ret).

 +    }
 +
 +    printf(%s, leave\n, __func__);

 Debug code should be removed.

 Stefan





-- 
Regards,

Zhi Yong Wu



Re: [Qemu-devel] [PATCH v9 1/4] block: add the block queue support

2011-10-31 Thread Stefan Hajnoczi
On Fri, Oct 28, 2011 at 11:02 AM, Zhi Yong Wu wu...@linux.vnet.ibm.com wrote:
 +static void bdrv_io_limits_skip_set(void *opaque,
 +                                    BlockAPIType co_type,
 +                                    bool cb_skip,
 +                                    bool limit_skip) {
 +    RwCo *rwco;
 +    BlockDriverAIOCBCoroutine *aioco;
 +
 +    if (co_type == BDRV_API_SYNC) {
 +        rwco = opaque;
 +        rwco-limit_skip = limit_skip;
 +    } else if (co_type == BDRV_API_ASYNC) {
 +        aioco = opaque;
 +        aioco-cb_skip = cb_skip;
 +        aioco-limit_skip = limit_skip;
 +    } else {
 +        abort();
 +    }
 +}

The main question I have about this series is why have different cases
for sync, aio, and coroutines?  Perhaps I'm missing something but this
should all be much simpler.

All read/write requests are processed in a coroutine
(bdrv_co_do_readv()/bdrv_co_do_writev()).  That is the place to
perform I/O throttling.  Throttling should not be aware of sync, aio,
vs coroutines.

Since all requests have coroutines you could use CoQueue and the
actual queue waiting code in bdrv_co_do_readv()/bdrv_co_do_writev()
becomes:

if (bdrv_exceeds_io_limit(bs, sector_num, qiov, is_write)) {
qemu_co_queue_wait(bs-throttled_reqs);

/* Wait until this request is allowed to start */
while (bdrv_exceeds_io_limit(bs, sector_num, qiov, is_write)) {
/* Re-inserting at the head of the CoQueue is equivalent to
 * the queue-flushing/queue-limit_exceeded behavior in your
 * patch.
 */
qemu_co_queue_wait_insert_head(bs-throttled_reqs);
}
}

I think block/blk-queue.h isn't needed if you use the existing CoQueue
structure.

This is the main point I want to raise, just a few minor comments
below which may not be relevant if you can drop BlockQueue.

 +static int qemu_block_queue_handler(BlockQueueAIOCB *request)
 +{
 +    int ret;
 +
 +    BlockDriverState *bs = request-common.bs;
 +    assert(bs);
 +
 +    if (bs-io_limits_enabled) {

I'm not sure why the BlockQueue needs to reach into BlockDriverState.
Now BlockDriverState and BlockQueue know about and depend on each
other.  It's usually nicer to keep the relationship unidirectional, if
possible.

 +        ret = request-handler(request-common.bs, request-sector_num,
 +                               request-nb_sectors, request-qiov,
 +                               request, request-co_type);
 +    } else {
 +        if (request-co_type == BDRV_API_CO) {
 +            qemu_coroutine_enter(request-co, request-cocb);
 +        } else {
 +            printf(%s, req: %p\n, __func__, (void *)request);

Debug output should be removed.

 +            bdrv_io_limits_issue_request(request, request-co_type);
 +        }
 +
 +        ret = BDRV_BLKQ_DEQ_PASS;
 +    }
 +
 +    return ret;
 +}
 +
 +void qemu_block_queue_submit(BlockQueue *queue, BlockDriverCompletionFunc 
 *cb)
 +{
 +    BlockQueueAIOCB *request;
 +    queue-flushing = true;
 +
 +    /*QTAILQ_FOREACH_SAFE(request, queue-requests, entry, next) {*/

Commented out code should be removed.

 +    while (!QTAILQ_EMPTY(queue-requests)) {
 +        int ret = 0;
 +
 +        request = QTAILQ_FIRST(queue-requests);
 +        QTAILQ_REMOVE(queue-requests, request, entry);
 +        queue-limit_exceeded = false;
 +        ret = qemu_block_queue_handler(request);
 +        if (ret == -EIO) {
 +            cb(request, -EIO);
 +            break;
 +        } else if (ret == BDRV_BLKQ_ENQ_AGAIN) {
 +            QTAILQ_INSERT_HEAD(queue-requests, request, entry);
 +            break;
 +        } else if (ret == BDRV_BLKQ_DEQ_PASS) {
 +            cb(request, 0);
 +        }

What if ret is not -EIO, BDRV_BLKQ_ENQ_AGAIN, or BDRV_BLKQ_DEQ_PASS?
I think the -EIO case should be the default that calls cb(request,
ret).

 +    }
 +
 +    printf(%s, leave\n, __func__);

Debug code should be removed.

Stefan



[Qemu-devel] [PATCH v9 1/4] block: add the block queue support

2011-10-28 Thread Zhi Yong Wu
Signed-off-by: Zhi Yong Wu wu...@linux.vnet.ibm.com
---
 Makefile.objs |2 +-
 block.c   |   71 +--
 block.h   |   20 +
 block/blk-queue.c |  201 +
 block/blk-queue.h |   63 +
 block_int.h   |   23 ++
 6 files changed, 371 insertions(+), 9 deletions(-)
 create mode 100644 block/blk-queue.c
 create mode 100644 block/blk-queue.h

diff --git a/Makefile.objs b/Makefile.objs
index 01587c8..98891b3 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -33,7 +33,7 @@ block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o 
dmg.o bochs.o vpc.o vv
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o 
qcow2-cache.o
 block-nested-y += qed.o qed-gencb.o qed-l2-cache.o qed-table.o qed-cluster.o
 block-nested-y += qed-check.o
-block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o
+block-nested-y += parallels.o nbd.o blkdebug.o sheepdog.o blkverify.o 
blk-queue.o
 block-nested-$(CONFIG_WIN32) += raw-win32.o
 block-nested-$(CONFIG_POSIX) += raw-posix.o
 block-nested-$(CONFIG_CURL) += curl.o
diff --git a/block.c b/block.c
index 70aab63..40ab277 100644
--- a/block.c
+++ b/block.c
@@ -1026,6 +1026,7 @@ typedef struct RwCo {
 QEMUIOVector *qiov;
 bool is_write;
 int ret;
+bool limit_skip;
 } RwCo;
 
 static void coroutine_fn bdrv_rw_co_entry(void *opaque)
@@ -1060,6 +1061,7 @@ static int bdrv_rw_co(BlockDriverState *bs, int64_t 
sector_num, uint8_t *buf,
 .qiov = qiov,
 .is_write = is_write,
 .ret = NOT_DONE,
+.limit_skip = false,
 };
 
 qemu_iovec_init_external(qiov, iov, 1);
@@ -1242,6 +1244,64 @@ int bdrv_pwrite_sync(BlockDriverState *bs, int64_t 
offset,
 return 0;
 }
 
+typedef struct BlockDriverAIOCBCoroutine {
+BlockDriverAIOCB common;
+BlockRequest req;
+bool is_write;
+QEMUBH *bh;
+bool cb_skip;
+bool limit_skip;
+void *blkq_acb;
+} BlockDriverAIOCBCoroutine;
+
+/* block I/O throttling */
+typedef struct CoroutineCB {
+BlockDriverState *bs;
+int64_t sector_num;
+int nb_sectors;
+QEMUIOVector *qiov;
+} CoroutineCB;
+
+static void bdrv_io_limits_skip_set(void *opaque,
+BlockAPIType co_type,
+bool cb_skip,
+bool limit_skip) {
+RwCo *rwco;
+BlockDriverAIOCBCoroutine *aioco;
+
+if (co_type == BDRV_API_SYNC) {
+rwco = opaque;
+rwco-limit_skip = limit_skip;
+} else if (co_type == BDRV_API_ASYNC) {
+aioco = opaque;
+aioco-cb_skip = cb_skip;
+aioco-limit_skip = limit_skip;
+} else {
+abort();
+}
+}
+
+void bdrv_io_limits_issue_request(void *opaque,
+  BlockAPIType co_type) {
+BlockQueueAIOCB *blkq_acb = opaque;
+
+if (blkq_acb-co_type == BDRV_API_CO) {
+qemu_coroutine_enter(blkq_acb-co, blkq_acb-cocb);
+} else {
+CoroutineEntry *entry = NULL;
+if (co_type == BDRV_API_SYNC) {
+entry = bdrv_rw_co_entry;
+} else if (co_type == BDRV_API_ASYNC) {
+entry = bdrv_co_do_rw;
+}
+
+bdrv_io_limits_skip_set(blkq_acb-cocb,
+co_type, false, true);
+Coroutine *co = qemu_coroutine_create(entry);
+qemu_coroutine_enter(co, blkq_acb-cocb);
+}
+}
+
 /*
  * Handle a read request in coroutine context
  */
@@ -2650,14 +2710,6 @@ static BlockDriverAIOCB 
*bdrv_aio_writev_em(BlockDriverState *bs,
 return bdrv_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
 }
 
-
-typedef struct BlockDriverAIOCBCoroutine {
-BlockDriverAIOCB common;
-BlockRequest req;
-bool is_write;
-QEMUBH* bh;
-} BlockDriverAIOCBCoroutine;
-
 static void bdrv_aio_co_cancel_em(BlockDriverAIOCB *blockacb)
 {
 qemu_aio_flush();
@@ -2711,6 +2763,9 @@ static BlockDriverAIOCB 
*bdrv_co_aio_rw_vector(BlockDriverState *bs,
 acb-req.nb_sectors = nb_sectors;
 acb-req.qiov = qiov;
 acb-is_write = is_write;
+acb-blkq_acb = NULL;
+acb-cb_skip  = false;
+acb-limit_skip = false;
 
 co = qemu_coroutine_create(bdrv_co_do_rw);
 qemu_coroutine_enter(co, acb);
diff --git a/block.h b/block.h
index 5a042c9..b7fbc40 100644
--- a/block.h
+++ b/block.h
@@ -82,6 +82,17 @@ typedef enum {
 BDRV_IOS_MAX
 } BlockIOStatus;
 
+/* disk I/O throttling */
+typedef enum {
+BDRV_BLKQ_ENQ_FIRST, BDRV_BLKQ_ENQ_AGAIN, BDRV_BLKQ_DEQ_PASS,
+BDRV_BLKQ_PASS, BDRV_IO_MAX
+} BlockQueueRetType;
+
+typedef enum {
+BDRV_API_SYNC, BDRV_API_ASYNC, BDRV_API_CO,
+BDRV_API_MAX
+} BlockAPIType;
+
 void bdrv_iostatus_enable(BlockDriverState *bs);
 void bdrv_iostatus_reset(BlockDriverState *bs);
 void bdrv_iostatus_disable(BlockDriverState *bs);
@@ -94,6 +105,8 @@ void bdrv_info(Monitor *mon, QObject **ret_data);
 void bdrv_stats_print(Monitor *mon,