Am 14.01.21 um 20:19 schrieb Jason Dillaman: > On Sun, Dec 27, 2020 at 11:42 AM Peter Lieven <p...@kamp.de> wrote: >> Signed-off-by: Peter Lieven <p...@kamp.de> >> --- >> block/rbd.c | 247 ++++++++++++++++++---------------------------------- >> 1 file changed, 84 insertions(+), 163 deletions(-) >> >> diff --git a/block/rbd.c b/block/rbd.c >> index 27b232f4d8..2d77d0007f 100644 >> --- a/block/rbd.c >> +++ b/block/rbd.c >> @@ -66,22 +66,6 @@ typedef enum { >> RBD_AIO_FLUSH >> } RBDAIOCmd; >> >> -typedef struct RBDAIOCB { >> - BlockAIOCB common; >> - int64_t ret; >> - QEMUIOVector *qiov; >> - RBDAIOCmd cmd; >> - int error; >> - struct BDRVRBDState *s; >> -} RBDAIOCB; >> - >> -typedef struct RADOSCB { >> - RBDAIOCB *acb; >> - struct BDRVRBDState *s; >> - int64_t size; >> - int64_t ret; >> -} RADOSCB; >> - >> typedef struct BDRVRBDState { >> rados_t cluster; >> rados_ioctx_t io_ctx; >> @@ -94,6 +78,13 @@ typedef struct BDRVRBDState { >> AioContext *aio_context; >> } BDRVRBDState; >> >> +typedef struct RBDTask { >> + BDRVRBDState *s; >> + Coroutine *co; >> + bool complete; >> + int64_t ret; >> +} RBDTask; >> + >> static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx, >> BlockdevOptionsRbd *opts, bool cache, >> const char *keypairs, const char *secretid, >> @@ -316,13 +307,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const >> char *keypairs_json, >> return ret; >> } >> >> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs) >> -{ >> - RBDAIOCB *acb = rcb->acb; >> - iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0, >> - acb->qiov->size - offs); >> -} >> - >> /* FIXME Deprecate and remove keypairs or make it available in QMP. */ >> static int qemu_rbd_do_create(BlockdevCreateOptions *options, >> const char *keypairs, const char >> *password_secret, >> @@ -440,46 +424,6 @@ exit: >> return ret; >> } >> >> -/* >> - * This aio completion is being called from rbd_finish_bh() and runs in qemu >> - * BH context. >> - */ >> -static void qemu_rbd_complete_aio(RADOSCB *rcb) >> -{ >> - RBDAIOCB *acb = rcb->acb; >> - int64_t r; >> - >> - r = rcb->ret; >> - >> - if (acb->cmd != RBD_AIO_READ) { >> - if (r < 0) { >> - acb->ret = r; >> - acb->error = 1; >> - } else if (!acb->error) { >> - acb->ret = rcb->size; >> - } >> - } else { >> - if (r < 0) { >> - qemu_rbd_memset(rcb, 0); >> - acb->ret = r; >> - acb->error = 1; >> - } else if (r < rcb->size) { >> - qemu_rbd_memset(rcb, r); >> - if (!acb->error) { >> - acb->ret = rcb->size; >> - } >> - } else if (!acb->error) { >> - acb->ret = r; >> - } >> - } >> - >> - g_free(rcb); >> - >> - acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret)); >> - >> - qemu_aio_unref(acb); >> -} >> - >> static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp) >> { >> const char **vals; >> @@ -817,88 +761,49 @@ static int qemu_rbd_resize(BlockDriverState *bs, >> uint64_t size) >> return 0; >> } >> >> -static const AIOCBInfo rbd_aiocb_info = { >> - .aiocb_size = sizeof(RBDAIOCB), >> -}; >> - >> -static void rbd_finish_bh(void *opaque) >> +static void qemu_rbd_finish_bh(void *opaque) >> { >> - RADOSCB *rcb = opaque; >> - qemu_rbd_complete_aio(rcb); >> + RBDTask *task = opaque; >> + task->complete = 1; >> + aio_co_wake(task->co); >> } >> >> -/* >> - * This is the callback function for rbd_aio_read and _write >> - * >> - * Note: this function is being called from a non qemu thread so >> - * we need to be careful about what we do here. Generally we only >> - * schedule a BH, and do the rest of the io completion handling >> - * from rbd_finish_bh() which runs in a qemu context. >> - */ >> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb) >> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task) >> { >> - RBDAIOCB *acb = rcb->acb; >> - >> - rcb->ret = rbd_aio_get_return_value(c); >> + task->ret = rbd_aio_get_return_value(c); >> rbd_aio_release(c); >> - >> - replay_bh_schedule_oneshot_event(acb->s->aio_context, rbd_finish_bh, >> rcb); >> + aio_bh_schedule_oneshot(task->s->aio_context, qemu_rbd_finish_bh, task); >> } >> >> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs, >> - int64_t off, >> - QEMUIOVector *qiov, >> - int64_t size, >> - BlockCompletionFunc *cb, >> - void *opaque, >> - RBDAIOCmd cmd) >> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs, >> + uint64_t offset, >> + uint64_t bytes, >> + QEMUIOVector *qiov, >> + int flags, >> + RBDAIOCmd cmd) >> { >> - RBDAIOCB *acb; >> - RADOSCB *rcb = NULL; >> - rbd_completion_t c; >> - int r; >> - >> BDRVRBDState *s = bs->opaque; >> + RBDTask task = { .s = s, .co = qemu_coroutine_self() }; >> + rbd_completion_t c; >> + int r, ret = -EIO; >> >> - acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque); >> - acb->cmd = cmd; >> - acb->qiov = qiov; >> - assert(!qiov || qiov->size == size); >> - >> - rcb = g_new(RADOSCB, 1); >> - >> - acb->ret = 0; >> - acb->error = 0; >> - acb->s = s; >> + assert(!qiov || qiov->size == bytes); >> >> - rcb->acb = acb; >> - rcb->s = acb->s; >> - rcb->size = size; >> - r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, >> &c); >> + r = rbd_aio_create_completion(&task, >> + (rbd_callback_t) qemu_rbd_completion_cb, >> &c); >> if (r < 0) { >> goto failed; >> } >> >> switch (cmd) { >> - case RBD_AIO_WRITE: >> - /* >> - * RBD APIs don't allow us to write more than actual size, so in >> order >> - * to support growing images, we resize the image before write >> - * operations that exceed the current size. >> - */ >> - if (off + size > s->image_size) { >> - r = qemu_rbd_resize(bs, off + size); > I realize this is inherited code that is being refactored, but is it > really possible for QEMU to issue IOs outside the block extents? This > also has the same issue as the previous commit comment where the > "image_size" is only initialized at image open time and therefore this > path will keep getting hit if IOs are issued beyond the original block > device extents.
This is for the case that you came to the idea that putting e.g. a qcow2 file on rbd which actually can grow during runtime. If you put a raw image on an rbd image the only way to grow is with block_resize (which invoces truncate) or with qemu-img resize... I personally was unsure if it makes sense at all to supprt anything except raw on rbd, but it was supported before so I left this in. > >> - if (r < 0) { >> - goto failed_completion; >> - } >> - } >> - r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c); >> - break; >> case RBD_AIO_READ: >> - r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c); >> + r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c); >> + break; >> + case RBD_AIO_WRITE: >> + r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c); >> break; >> case RBD_AIO_DISCARD: >> - r = rbd_aio_discard(s->image, off, size, c); >> + r = rbd_aio_discard(s->image, offset, bytes, c); >> break; >> case RBD_AIO_FLUSH: >> r = rbd_aio_flush(s->image, c); >> @@ -908,44 +813,71 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs, >> } >> >> if (r < 0) { >> + task.complete = 1; >> + task.ret = r; >> + } >> + >> + while (!task.complete) { >> + qemu_coroutine_yield(); >> + } >> + >> + if (task.ret < 0) { >> + error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %" >> + PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, >> offset, >> + bytes, flags, task.ret, strerror(-task.ret)); >> goto failed_completion; >> } >> - return &acb->common; >> + >> + /* zero pad short reads */ >> + if (cmd == RBD_AIO_READ && task.ret < qiov->size) { >> + qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret); >> + } >> + >> + return 0; >> >> failed_completion: >> rbd_aio_release(c); > Wasn't this already released in "qemu_rbd_completion_cb"? You are right, in case of task.ret < 0 we need to go to "failed" and not "failed_completion". If the I/O is successful we leave the coroutine with "return 0" just above the label. Peter