Am 24.11.2010 12:11, schrieb Stefan Hajnoczi: > This patch implements the read/write state machine. Operations are > fully asynchronous and multiple operations may be active at any time. > > Allocating writes lock tables to ensure metadata updates do not > interfere with each other. If two allocating writes need to update the > same L2 table they will run sequentially. If two allocating writes need > to update different L2 tables they will run in parallel. > > Signed-off-by: Stefan Hajnoczi <stefa...@linux.vnet.ibm.com> > --- > Makefile.objs | 1 + > block/qed-lock.c | 124 ++++++++++ > block/qed.c | 667 > +++++++++++++++++++++++++++++++++++++++++++++++++++++- > block/qed.h | 43 ++++ > trace-events | 10 + > 5 files changed, 843 insertions(+), 2 deletions(-) > create mode 100644 block/qed-lock.c
> diff --git a/block/qed.c b/block/qed.c > index cd1bead..1513763 100644 > --- a/block/qed.c > +++ b/block/qed.c > @@ -12,8 +12,26 @@ > * > */ > > +#include "trace.h" > #include "qed.h" > > +static void qed_aio_cancel(BlockDriverAIOCB *blockacb) > +{ > + QEDAIOCB *acb = (QEDAIOCB *)blockacb; > + bool finished = false; > + > + /* Wait for the request to finish */ > + acb->finished = &finished; > + while (!finished) { > + qemu_aio_wait(); > + } > +} Hm, you don't even try to cancel? I wonder how useful the individual bdrv_aio_cancel implementations actually are when nobody implements cancellation. It seems to be pretty hard to do it right. Maybe we should consider implementing a default bdrv_aio_cancel implementation in block.c that waits for completion? > +/** > + * Construct an iovec array for a given length > + * > + * @acb: I/O request > + * @len: Maximum number of bytes > + * > + * This function can be called several times to build subset iovec arrays of > + * acb->qiov. For example: > + * > + * acb->qiov->iov[] = {{0x100000, 1024}, > + * {0x200000, 1024}} > + * > + * qed_acb_build_qiov(acb, 512) => > + * {{0x100000, 512}} > + * > + * qed_acb_build_qiov(acb, 1024) => > + * {{0x100200, 512}, > + * {0x200000, 512}} > + * > + * qed_acb_build_qiov(acb, 512) => > + * {{0x200200, 512}} > + */ > +static void qed_acb_build_qiov(QEDAIOCB *acb, size_t len) > +{ > + struct iovec *iov_end = &acb->qiov->iov[acb->qiov->niov]; > + size_t iov_offset = acb->cur_iov_offset; > + struct iovec *iov = acb->cur_iov; > + > + while (iov != iov_end && len > 0) { > + size_t nbytes = MIN(iov->iov_len - iov_offset, len); > + > + qemu_iovec_add(&acb->cur_qiov, iov->iov_base + iov_offset, nbytes); > + iov_offset += nbytes; > + len -= nbytes; > + > + if (iov_offset >= iov->iov_len) { > + iov_offset = 0; > + iov++; > + } > + } > + > + /* Stash state for next time */ > + acb->cur_iov = iov; > + acb->cur_iov_offset = iov_offset; > +} Wouldn't it be much simpler to just store the offset into acb->qiov and to use qemu_iovec_copy to get the subset qiov? > +/** > + * Write data to the image file > + */ > +static void qed_aio_write_main(void *opaque, int ret) > +{ > + QEDAIOCB *acb = opaque; > + BDRVQEDState *s = acb_to_s(acb); > + uint64_t offset = acb->cur_cluster; > + BlockDriverCompletionFunc *next_fn; > + BlockDriverAIOCB *file_acb; > + > + trace_qed_aio_write_main(s, acb, ret, offset, acb->cur_qiov.size); Why does the trace use a different offset... > + > + if (ret) { > + qed_aio_complete(acb, ret); > + return; > + } > + > + if (acb->find_cluster_ret == QED_CLUSTER_FOUND) { > + next_fn = qed_aio_next_io; > + } else { > + if (s->bs->backing_hd) { > + next_fn = qed_aio_write_flush_before_l2_update; > + } else { > + next_fn = qed_aio_write_l2_update; > + } > + } > + > + offset += qed_offset_into_cluster(s, acb->cur_pos); ...than the write uses? I missed this line at first because offset is initialized above, so I didn't expect that this was only half of its initialization. > + BLKDBG_EVENT(s->bs->file, BLKDBG_WRITE_AIO); > + file_acb = bdrv_aio_writev(s->bs->file, offset / BDRV_SECTOR_SIZE, > + &acb->cur_qiov, > + acb->cur_qiov.size / BDRV_SECTOR_SIZE, > + next_fn, acb); > + if (!file_acb) { > + qed_aio_complete(acb, -EIO); > + } > +} > + > +/** > + * Populate back untouched region of new data cluster > + */ > +static void qed_aio_write_postfill(void *opaque, int ret) > +{ > + QEDAIOCB *acb = opaque; > + BDRVQEDState *s = acb_to_s(acb); > + uint64_t start = acb->cur_pos + acb->cur_qiov.size; > + uint64_t len = qed_start_of_cluster(s, start + s->header.cluster_size - > 1) - start; > + uint64_t offset = acb->cur_cluster + qed_offset_into_cluster(s, > acb->cur_pos) + acb->cur_qiov.size; This look like more than 80 characters. Kevin