[Qemu-devel] [PATCH v3 1/3] linux-aio: fix submit aio as a batch

2014-11-06 Thread Ming Lei
In the enqueue path, we can't complete request, otherwise
"Co-routine re-entered recursively" may be caused, so this
patch fixes the issue with below ideas:

- for -EAGAIN or partial completion, retry the submision by
schedule an BH in following completion cb
- for part of completion, also update the io queue
- for other failure, return the failure if in enqueue path,
otherwise, abort all queued I/O

Signed-off-by: Ming Lei 
---
 block/linux-aio.c |  101 +
 1 file changed, 79 insertions(+), 22 deletions(-)

diff --git a/block/linux-aio.c b/block/linux-aio.c
index d92513b..f66e8ad 100644
--- a/block/linux-aio.c
+++ b/block/linux-aio.c
@@ -38,11 +38,19 @@ struct qemu_laiocb {
 QLIST_ENTRY(qemu_laiocb) node;
 };
 
+/*
+ * TODO: support to batch I/O from multiple bs in one same
+ * AIO context, one important use case is multi-lun scsi,
+ * so in future the IO queue should be per AIO context.
+ */
 typedef struct {
 struct iocb *iocbs[MAX_QUEUED_IO];
 int plugged;
 unsigned int size;
 unsigned int idx;
+
+/* handle -EAGAIN and partial completion */
+QEMUBH *retry;
 } LaioQueue;
 
 struct qemu_laio_state {
@@ -137,6 +145,12 @@ static void qemu_laio_completion_bh(void *opaque)
 }
 }
 
+static void qemu_laio_start_retry(struct qemu_laio_state *s)
+{
+if (s->io_q.idx)
+qemu_bh_schedule(s->io_q.retry);
+}
+
 static void qemu_laio_completion_cb(EventNotifier *e)
 {
 struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
@@ -144,6 +158,7 @@ static void qemu_laio_completion_cb(EventNotifier *e)
 if (event_notifier_test_and_clear(&s->e)) {
 qemu_bh_schedule(s->completion_bh);
 }
+qemu_laio_start_retry(s);
 }
 
 static void laio_cancel(BlockAIOCB *blockacb)
@@ -163,6 +178,9 @@ static void laio_cancel(BlockAIOCB *blockacb)
 }
 
 laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
+
+/* check if there are requests in io queue */
+qemu_laio_start_retry(laiocb->ctx);
 }
 
 static const AIOCBInfo laio_aiocb_info = {
@@ -177,45 +195,80 @@ static void ioq_init(LaioQueue *io_q)
 io_q->plugged = 0;
 }
 
-static int ioq_submit(struct qemu_laio_state *s)
+static void abort_queue(struct qemu_laio_state *s)
+{
+int i;
+for (i = 0; i < s->io_q.idx; i++) {
+struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
+  struct qemu_laiocb,
+  iocb);
+laiocb->ret = -EIO;
+qemu_laio_process_completion(s, laiocb);
+}
+}
+
+static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
 {
 int ret, i = 0;
 int len = s->io_q.idx;
+int j = 0;
 
-do {
-ret = io_submit(s->ctx, len, s->io_q.iocbs);
-} while (i++ < 3 && ret == -EAGAIN);
+if (!len) {
+return 0;
+}
 
-/* empty io queue */
-s->io_q.idx = 0;
+ret = io_submit(s->ctx, len, s->io_q.iocbs);
+if (ret == -EAGAIN) { /* retry in following completion cb */
+return 0;
+} else if (ret < 0) {
+if (enqueue) {
+return ret;
+}
 
-if (ret < 0) {
-i = 0;
-} else {
-i = ret;
+/* in non-queue path, all IOs have to be completed */
+abort_queue(s);
+ret = len;
+} else if (ret == 0) {
+goto out;
 }
 
-for (; i < len; i++) {
-struct qemu_laiocb *laiocb =
-container_of(s->io_q.iocbs[i], struct qemu_laiocb, iocb);
-
-laiocb->ret = (ret < 0) ? ret : -EIO;
-qemu_laio_process_completion(s, laiocb);
+for (i = ret; i < len; i++) {
+s->io_q.iocbs[j++] = s->io_q.iocbs[i];
 }
+
+ out:
+/*
+ * update io queue, for partial completion, retry will be
+ * started automatically in following completion cb.
+ */
+s->io_q.idx -= ret;
+
 return ret;
 }
 
-static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
+static void ioq_submit_retry(void *opaque)
+{
+struct qemu_laio_state *s = opaque;
+ioq_submit(s, false);
+}
+
+static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
 {
 unsigned int idx = s->io_q.idx;
 
+if (unlikely(idx == s->io_q.size)) {
+return -1;
+}
+
 s->io_q.iocbs[idx++] = iocb;
 s->io_q.idx = idx;
 
-/* submit immediately if queue is full */
-if (idx == s->io_q.size) {
-ioq_submit(s);
+/* submit immediately if queue depth is above 2/3 */
+if (idx > s->io_q.size * 2 / 3) {
+return ioq_submit(s, true);
 }
+
+return 0;
 }
 
 void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
@@ -237,7 +290,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, 
bool unplug)
 }
 
 if (s->io_q.idx > 0) {
-ret = ioq_submit(s);
+ret = ioq_submit(s, false);
 }
 
 return ret;
@@ -281,7 +334,9 @@ BlockAIOCB *laio_submit(BlockD

Re: [Qemu-devel] [PATCH v3 1/3] linux-aio: fix submit aio as a batch

2014-11-22 Thread Ming Lei
On Tue, Nov 18, 2014 at 10:18 PM, Paolo Bonzini  wrote:
>
>
>> @@ -137,6 +145,12 @@ static void qemu_laio_completion_bh(void *opaque)
>>  }
>>  }
>>
>> +static void qemu_laio_start_retry(struct qemu_laio_state *s)
>> +{
>> +if (s->io_q.idx)
>> +qemu_bh_schedule(s->io_q.retry);
>> +}
>> +
>>  static void qemu_laio_completion_cb(EventNotifier *e)
>>  {
>>  struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
>> @@ -144,6 +158,7 @@ static void qemu_laio_completion_cb(EventNotifier *e)
>>  if (event_notifier_test_and_clear(&s->e)) {
>>  qemu_bh_schedule(s->completion_bh);
>>  }
>> +qemu_laio_start_retry(s);
>
> I think you do not even need two bottom halves.  Just call ioq_submit
> from completion_bh instead, after the call to io_getevents.

Yes, that can save one BH, actually the patch was written when
there wasn't completion BH, :-)

>
>>  }
>>
>>  static void laio_cancel(BlockAIOCB *blockacb)
>> @@ -163,6 +178,9 @@ static void laio_cancel(BlockAIOCB *blockacb)
>>  }
>>
>>  laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
>> +
>> +/* check if there are requests in io queue */
>> +qemu_laio_start_retry(laiocb->ctx);
>>  }
>>
>>  static const AIOCBInfo laio_aiocb_info = {
>> @@ -177,45 +195,80 @@ static void ioq_init(LaioQueue *io_q)
>>  io_q->plugged = 0;
>>  }
>>
>> -static int ioq_submit(struct qemu_laio_state *s)
>> +static void abort_queue(struct qemu_laio_state *s)
>> +{
>> +int i;
>> +for (i = 0; i < s->io_q.idx; i++) {
>> +struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
>> +  struct qemu_laiocb,
>> +  iocb);
>> +laiocb->ret = -EIO;
>> +qemu_laio_process_completion(s, laiocb);
>> +}
>> +}
>> +
>> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
>>  {
>>  int ret, i = 0;
>>  int len = s->io_q.idx;
>> +int j = 0;
>>
>> -do {
>> -ret = io_submit(s->ctx, len, s->io_q.iocbs);
>> -} while (i++ < 3 && ret == -EAGAIN);
>> +if (!len) {
>> +return 0;
>> +}
>>
>> -/* empty io queue */
>> -s->io_q.idx = 0;
>> +ret = io_submit(s->ctx, len, s->io_q.iocbs);
>> +if (ret == -EAGAIN) { /* retry in following completion cb */
>> +return 0;
>> +} else if (ret < 0) {
>> +if (enqueue) {
>> +return ret;
>> +}
>>
>> -if (ret < 0) {
>> -i = 0;
>> -} else {
>> -i = ret;
>> +/* in non-queue path, all IOs have to be completed */
>> +abort_queue(s);
>> +ret = len;
>> +} else if (ret == 0) {
>> +goto out;
>
> No need for goto; just move the "for" loop inside this conditional.  Or
> better, just use memmove.  That is:
>
> if (ret < 0) {
> if (ret == -EAGAIN) {
> return 0;
> }
> if (enqueue) {
> return ret;
> }
>
> abort_queue(s);
> ret = len;
> }
>
> if (ret > 0) {
> memmove(...)
> s->io_q.idx -= ret;
> }
> return ret;

The above is better.

>> + * update io queue, for partial completion, retry will be
>> + * started automatically in following completion cb.
>> + */
>> +s->io_q.idx -= ret;
>> +
>>  return ret;
>>  }
>>
>> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>> +static void ioq_submit_retry(void *opaque)
>> +{
>> +struct qemu_laio_state *s = opaque;
>> +ioq_submit(s, false);
>> +}
>> +
>> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>>  {
>>  unsigned int idx = s->io_q.idx;
>>
>> +if (unlikely(idx == s->io_q.size)) {
>> +return -1;
>
> return -EAGAIN?

It means the io queue is full, so the code has to fail the current
request.

Thanks,
Ming Lei



Re: [Qemu-devel] [PATCH v3 1/3] linux-aio: fix submit aio as a batch

2014-11-24 Thread Paolo Bonzini


On 22/11/2014 13:16, Ming Lei wrote:
> > > +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> > >  {
> > >  unsigned int idx = s->io_q.idx;
> > >
> > > +if (unlikely(idx == s->io_q.size)) {
> > > +return -1;
> >
> > return -EAGAIN?
>
> It means the io queue is full, so the code has to fail the current
> request.

Right, but "-1" is "-EPERM" which doesn't make much sense.  I think the
right thing to do is to return EAGAIN, and let the owner figure it out.
 For example, a SCSI device might return a "BUSY" status code.

Paolo



Re: [Qemu-devel] [PATCH v3 1/3] linux-aio: fix submit aio as a batch

2014-11-24 Thread Ming Lei
On Mon, Nov 24, 2014 at 5:47 PM, Paolo Bonzini  wrote:
>
>
> On 22/11/2014 13:16, Ming Lei wrote:
>> > > +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>> > >  {
>> > >  unsigned int idx = s->io_q.idx;
>> > >
>> > > +if (unlikely(idx == s->io_q.size)) {
>> > > +return -1;
>> >
>> > return -EAGAIN?
>>
>> It means the io queue is full, so the code has to fail the current
>> request.
>
> Right, but "-1" is "-EPERM" which doesn't make much sense.  I think the
> right thing to do is to return EAGAIN, and let the owner figure it out.
>  For example, a SCSI device might return a "BUSY" status code.

We can do that, but laio_submit() doesn't return the code to callers.


Thanks,
Ming Lei



Re: [Qemu-devel] [PATCH v3 1/3] linux-aio: fix submit aio as a batch

2014-11-24 Thread Paolo Bonzini


On 24/11/2014 11:02, Ming Lei wrote:
>> > Right, but "-1" is "-EPERM" which doesn't make much sense.  I think the
>> > right thing to do is to return EAGAIN, and let the owner figure it out.
>> >  For example, a SCSI device might return a "BUSY" status code.
> We can do that, but laio_submit() doesn't return the code to callers.

That can be fixed later.  But not returning a sensible errno only makes
things more complicated later on.

Paolo



Re: [Qemu-devel] [PATCH v3 1/3] linux-aio: fix submit aio as a batch

2014-11-18 Thread Paolo Bonzini


> @@ -137,6 +145,12 @@ static void qemu_laio_completion_bh(void *opaque)
>  }
>  }
>  
> +static void qemu_laio_start_retry(struct qemu_laio_state *s)
> +{
> +if (s->io_q.idx)
> +qemu_bh_schedule(s->io_q.retry);
> +}
> +
>  static void qemu_laio_completion_cb(EventNotifier *e)
>  {
>  struct qemu_laio_state *s = container_of(e, struct qemu_laio_state, e);
> @@ -144,6 +158,7 @@ static void qemu_laio_completion_cb(EventNotifier *e)
>  if (event_notifier_test_and_clear(&s->e)) {
>  qemu_bh_schedule(s->completion_bh);
>  }
> +qemu_laio_start_retry(s);

I think you do not even need two bottom halves.  Just call ioq_submit
from completion_bh instead, after the call to io_getevents.

>  }
>  
>  static void laio_cancel(BlockAIOCB *blockacb)
> @@ -163,6 +178,9 @@ static void laio_cancel(BlockAIOCB *blockacb)
>  }
>  
>  laiocb->common.cb(laiocb->common.opaque, laiocb->ret);
> +
> +/* check if there are requests in io queue */
> +qemu_laio_start_retry(laiocb->ctx);
>  }
>  
>  static const AIOCBInfo laio_aiocb_info = {
> @@ -177,45 +195,80 @@ static void ioq_init(LaioQueue *io_q)
>  io_q->plugged = 0;
>  }
>  
> -static int ioq_submit(struct qemu_laio_state *s)
> +static void abort_queue(struct qemu_laio_state *s)
> +{
> +int i;
> +for (i = 0; i < s->io_q.idx; i++) {
> +struct qemu_laiocb *laiocb = container_of(s->io_q.iocbs[i],
> +  struct qemu_laiocb,
> +  iocb);
> +laiocb->ret = -EIO;
> +qemu_laio_process_completion(s, laiocb);
> +}
> +}
> +
> +static int ioq_submit(struct qemu_laio_state *s, bool enqueue)
>  {
>  int ret, i = 0;
>  int len = s->io_q.idx;
> +int j = 0;
>  
> -do {
> -ret = io_submit(s->ctx, len, s->io_q.iocbs);
> -} while (i++ < 3 && ret == -EAGAIN);
> +if (!len) {
> +return 0;
> +}
>  
> -/* empty io queue */
> -s->io_q.idx = 0;
> +ret = io_submit(s->ctx, len, s->io_q.iocbs);
> +if (ret == -EAGAIN) { /* retry in following completion cb */
> +return 0;
> +} else if (ret < 0) {
> +if (enqueue) {
> +return ret;
> +}
>  
> -if (ret < 0) {
> -i = 0;
> -} else {
> -i = ret;
> +/* in non-queue path, all IOs have to be completed */
> +abort_queue(s);
> +ret = len;
> +} else if (ret == 0) {
> +goto out;

No need for goto; just move the "for" loop inside this conditional.  Or
better, just use memmove.  That is:

if (ret < 0) {
if (ret == -EAGAIN) {
return 0;
}
if (enqueue) {
return ret;
}

abort_queue(s);
ret = len;
}

if (ret > 0) {
memmove(...)
s->io_q.idx -= ret;
}
return ret;

> + * update io queue, for partial completion, retry will be
> + * started automatically in following completion cb.
> + */
> +s->io_q.idx -= ret;
> +
>  return ret;
>  }
>  
> -static void ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
> +static void ioq_submit_retry(void *opaque)
> +{
> +struct qemu_laio_state *s = opaque;
> +ioq_submit(s, false);
> +}
> +
> +static int ioq_enqueue(struct qemu_laio_state *s, struct iocb *iocb)
>  {
>  unsigned int idx = s->io_q.idx;
>  
> +if (unlikely(idx == s->io_q.size)) {
> +return -1;

return -EAGAIN?

Thanks,

Paolo

> +}
> +
>  s->io_q.iocbs[idx++] = iocb;
>  s->io_q.idx = idx;
>  
> -/* submit immediately if queue is full */
> -if (idx == s->io_q.size) {
> -ioq_submit(s);
> +/* submit immediately if queue depth is above 2/3 */
> +if (idx > s->io_q.size * 2 / 3) {
> +return ioq_submit(s, true);
>  }
> +
> +return 0;
>  }
>  
>  void laio_io_plug(BlockDriverState *bs, void *aio_ctx)
> @@ -237,7 +290,7 @@ int laio_io_unplug(BlockDriverState *bs, void *aio_ctx, 
> bool unplug)
>  }
>  
>  if (s->io_q.idx > 0) {
> -ret = ioq_submit(s);
> +ret = ioq_submit(s, false);
>  }
>  
>  return ret;
> @@ -281,7 +334,9 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, void 
> *aio_ctx, int fd,
>  goto out_free_aiocb;
>  }
>  } else {
> -ioq_enqueue(s, iocbs);
> +if (ioq_enqueue(s, iocbs) < 0) {
> +goto out_free_aiocb;
> +}
>  }
>  return &laiocb->common;
>  
> @@ -296,12 +351,14 @@ void laio_detach_aio_context(void *s_, AioContext 
> *old_context)
>  
>  aio_set_event_notifier(old_context, &s->e, NULL);
>  qemu_bh_delete(s->completion_bh);
> +qemu_bh_delete(s->io_q.retry);
>  }
>  
>  void laio_attach_aio_context(void *s_, AioContext *new_context)
>  {
>  struct qemu_laio_state *s = s_;
>  
> +s->io_q.retry = aio_bh_new(new_context, ioq_submit_retry, s);
>  s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_b