* Denis Plotnikov (dplotni...@virtuozzo.com) wrote: > The patch adds ability to qemu-file to write the data > asynchronously to improve the performance on writing. > Before, only synchronous writing was supported. > > Enabling of the asyncronous mode is managed by new > "enabled_buffered" callback.
It's a bit invasive isn't it - changes a lot of functions in a lot of places! The multifd code separated the control headers from the data on separate fd's - but that doesn't help your case. Is there any chance you could do this by using the existing 'save_page' hook (that RDMA uses). In the cover letter you mention direct qemu_fflush calls - have we got a few too many in some palces that you think we can clean out? Dave > Signed-off-by: Denis Plotnikov <dplotni...@virtuozzo.com> > --- > include/qemu/typedefs.h | 1 + > migration/qemu-file.c | 351 > +++++++++++++++++++++++++++++++++++++++++++++--- > migration/qemu-file.h | 9 ++ > 3 files changed, 339 insertions(+), 22 deletions(-) > > diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h > index 88dce54..9b388c8 100644 > --- a/include/qemu/typedefs.h > +++ b/include/qemu/typedefs.h > @@ -98,6 +98,7 @@ typedef struct QEMUBH QEMUBH; > typedef struct QemuConsole QemuConsole; > typedef struct QEMUFile QEMUFile; > typedef struct QEMUFileBuffer QEMUFileBuffer; > +typedef struct QEMUFileAioTask QEMUFileAioTask; > typedef struct QemuLockable QemuLockable; > typedef struct QemuMutex QemuMutex; > typedef struct QemuOpt QemuOpt; > diff --git a/migration/qemu-file.c b/migration/qemu-file.c > index 285c6ef..f42f949 100644 > --- a/migration/qemu-file.c > +++ b/migration/qemu-file.c > @@ -29,19 +29,25 @@ > #include "qemu-file.h" > #include "trace.h" > #include "qapi/error.h" > +#include "block/aio_task.h" > > -#define IO_BUF_SIZE 32768 > +#define IO_BUF_SIZE (1024 * 1024) > #define MAX_IOV_SIZE MIN(IOV_MAX, 64) > +#define IO_BUF_NUM 2 > +#define IO_BUF_ALIGNMENT 512 > > -QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, 512)); > +QEMU_BUILD_BUG_ON(!QEMU_IS_ALIGNED(IO_BUF_SIZE, IO_BUF_ALIGNMENT)); > +QEMU_BUILD_BUG_ON(IO_BUF_SIZE > INT_MAX); > +QEMU_BUILD_BUG_ON(IO_BUF_NUM <= 0); > > struct QEMUFileBuffer { > int buf_index; > - int buf_size; /* 0 when writing */ > + int buf_size; /* 0 when non-buffered writing */ > uint8_t *buf; > unsigned long *may_free; > struct iovec *iov; > unsigned int iovcnt; > + QLIST_ENTRY(QEMUFileBuffer) link; > }; > > struct QEMUFile { > @@ -60,6 +66,22 @@ struct QEMUFile { > bool shutdown; > /* currently used buffer */ > QEMUFileBuffer *current_buf; > + /* > + * with buffered_mode enabled all the data copied to 512 byte > + * aligned buffer, including iov data. Then the buffer is passed > + * to writev_buffer callback. > + */ > + bool buffered_mode; > + /* for async buffer writing */ > + AioTaskPool *pool; > + /* the list of free buffers, currently used on is NOT there */ > + QLIST_HEAD(, QEMUFileBuffer) free_buffers; > +}; > + > +struct QEMUFileAioTask { > + AioTask task; > + QEMUFile *f; > + QEMUFileBuffer *fb; > }; > > /* > @@ -115,10 +137,42 @@ QEMUFile *qemu_fopen_ops(void *opaque, const > QEMUFileOps *ops) > f->opaque = opaque; > f->ops = ops; > > - f->current_buf = g_new0(QEMUFileBuffer, 1); > - f->current_buf->buf = g_malloc(IO_BUF_SIZE); > - f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE); > - f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE); > + if (f->ops->enable_buffered) { > + f->buffered_mode = f->ops->enable_buffered(f->opaque); > + } > + > + if (f->buffered_mode && qemu_file_is_writable(f)) { > + int i; > + /* > + * in buffered_mode we don't use internal io vectors > + * and may_free bitmap, because we copy the data to be > + * written right away to the buffer > + */ > + f->pool = aio_task_pool_new(IO_BUF_NUM); > + > + /* allocate io buffers */ > + for (i = 0; i < IO_BUF_NUM; i++) { > + QEMUFileBuffer *fb = g_new0(QEMUFileBuffer, 1); > + > + fb->buf = qemu_memalign(IO_BUF_ALIGNMENT, IO_BUF_SIZE); > + fb->buf_size = IO_BUF_SIZE; > + > + /* > + * put the first buffer to the current buf and the rest > + * to the list of free buffers > + */ > + if (i == 0) { > + f->current_buf = fb; > + } else { > + QLIST_INSERT_HEAD(&f->free_buffers, fb, link); > + } > + } > + } else { > + f->current_buf = g_new0(QEMUFileBuffer, 1); > + f->current_buf->buf = g_malloc(IO_BUF_SIZE); > + f->current_buf->iov = g_new0(struct iovec, MAX_IOV_SIZE); > + f->current_buf->may_free = bitmap_new(MAX_IOV_SIZE); > + } > > return f; > } > @@ -190,6 +244,8 @@ static void qemu_iovec_release_ram(QEMUFile *f) > unsigned long idx; > QEMUFileBuffer *fb = f->current_buf; > > + assert(!f->buffered_mode); > + > /* Find and release all the contiguous memory ranges marked as may_free. > */ > idx = find_next_bit(fb->may_free, fb->iovcnt, 0); > if (idx >= fb->iovcnt) { > @@ -221,6 +277,147 @@ static void qemu_iovec_release_ram(QEMUFile *f) > bitmap_zero(fb->may_free, MAX_IOV_SIZE); > } > > +static void advance_buf_ptr(QEMUFile *f, size_t size) > +{ > + QEMUFileBuffer *fb = f->current_buf; > + /* must not advance to 0 */ > + assert(size); > + /* must not overflow buf_index (int) */ > + assert(fb->buf_index + size <= INT_MAX); > + /* must not exceed buf_size */ > + assert(fb->buf_index + size <= fb->buf_size); > + > + fb->buf_index += size; > +} > + > +static size_t get_buf_free_size(QEMUFile *f) > +{ > + QEMUFileBuffer *fb = f->current_buf; > + /* buf_index can't be greated than buf_size */ > + assert(fb->buf_size >= fb->buf_index); > + return fb->buf_size - fb->buf_index; > +} > + > +static size_t get_buf_used_size(QEMUFile *f) > +{ > + QEMUFileBuffer *fb = f->current_buf; > + return fb->buf_index; > +} > + > +static uint8_t *get_buf_ptr(QEMUFile *f) > +{ > + QEMUFileBuffer *fb = f->current_buf; > + /* protects from out of bound reading */ > + assert(fb->buf_index <= IO_BUF_SIZE); > + return fb->buf + fb->buf_index; > +} > + > +static bool buf_is_full(QEMUFile *f) > +{ > + return get_buf_free_size(f) == 0; > +} > + > +static void reset_buf(QEMUFile *f) > +{ > + QEMUFileBuffer *fb = f->current_buf; > + fb->buf_index = 0; > +} > + > +static int write_task_fn(AioTask *task) > +{ > + int ret; > + Error *local_error = NULL; > + QEMUFileAioTask *t = (QEMUFileAioTask *) task; > + QEMUFile *f = t->f; > + QEMUFileBuffer *fb = t->fb; > + uint64_t pos = f->pos; > + struct iovec v = (struct iovec) { > + .iov_base = fb->buf, > + .iov_len = fb->buf_index, > + }; > + > + assert(f->buffered_mode); > + > + /* > + * Increment file position. > + * This needs to be here before calling writev_buffer, because > + * writev_buffer is asynchronous and there could be more than one > + * writev_buffer started simultaniously. Each writev_buffer should > + * use its own file pos to write to. writev_buffer may write less > + * than buf_index bytes but we treat this situation as an error. > + * If error appeared, further file using is meaningless. > + * We expect that, the most of the time the full buffer is written, > + * (when buf_size == buf_index). The only case when the non-full > + * buffer is written (buf_size != buf_index) is file close, > + * when we need to flush the rest of the buffer content. > + */ > + f->pos += fb->buf_index; > + > + ret = f->ops->writev_buffer(f->opaque, &v, 1, pos, &local_error); > + > + /* return the just written buffer to the free list */ > + QLIST_INSERT_HEAD(&f->free_buffers, fb, link); > + > + /* check that we have written everything */ > + if (ret != fb->buf_index) { > + qemu_file_set_error_obj(f, ret < 0 ? ret : -EIO, local_error); > + } > + > + /* > + * always return 0 - don't use task error handling, relay on > + * qemu file error handling > + */ > + return 0; > +} > + > +static void qemu_file_switch_current_buf(QEMUFile *f) > +{ > + /* > + * if the list is empty, wait until some task returns a buffer > + * to the list of free buffers. > + */ > + if (QLIST_EMPTY(&f->free_buffers)) { > + aio_task_pool_wait_slot(f->pool); > + } > + > + /* > + * sanity check that the list isn't empty > + * if the free list was empty, we waited for a task complition, > + * and the pompleted task must return a buffer to a list of free buffers > + */ > + assert(!QLIST_EMPTY(&f->free_buffers)); > + > + /* set the current buffer for using from the free list */ > + f->current_buf = QLIST_FIRST(&f->free_buffers); > + reset_buf(f); > + > + QLIST_REMOVE(f->current_buf, link); > +} > + > +/** > + * Asynchronously flushes QEMUFile buffer > + * > + * This will flush all pending data. If data was only partially flushed, it > + * will set an error state. The function may return before the data actually > + * written. > + */ > +static void flush_buffer(QEMUFile *f) > +{ > + QEMUFileAioTask *t = g_new(QEMUFileAioTask, 1); > + > + *t = (QEMUFileAioTask) { > + .task.func = &write_task_fn, > + .f = f, > + .fb = f->current_buf, > + }; > + > + /* aio_task_pool should free t for us */ > + aio_task_pool_start_task(f->pool, (AioTask *) t); > + > + /* if no errors this will switch the buffer */ > + qemu_file_switch_current_buf(f); > +} > + > /** > * Flushes QEMUFile buffer > * > @@ -241,7 +438,13 @@ void qemu_fflush(QEMUFile *f) > if (f->shutdown) { > return; > } > + > + if (f->buffered_mode) { > + return; > + } > + > if (fb->iovcnt > 0) { > + /* this is non-buffered mode */ > expect = iov_size(fb->iov, fb->iovcnt); > ret = f->ops->writev_buffer(f->opaque, fb->iov, fb->iovcnt, f->pos, > &local_error); > @@ -378,6 +581,7 @@ static ssize_t qemu_fill_buffer(QEMUFile *f) > > void qemu_update_position(QEMUFile *f, size_t size) > { > + assert(!f->buffered_mode); > f->pos += size; > } > > @@ -392,7 +596,18 @@ void qemu_update_position(QEMUFile *f, size_t size) > int qemu_fclose(QEMUFile *f) > { > int ret; > - qemu_fflush(f); > + > + if (qemu_file_is_writable(f) && f->buffered_mode) { > + ret = qemu_file_get_error(f); > + if (!ret) { > + flush_buffer(f); > + } > + /* wait until all tasks are done */ > + aio_task_pool_wait_all(f->pool); > + } else { > + qemu_fflush(f); > + } > + > ret = qemu_file_get_error(f); > > if (f->ops->close) { > @@ -408,16 +623,77 @@ int qemu_fclose(QEMUFile *f) > ret = f->last_error; > } > error_free(f->last_error_obj); > - g_free(f->current_buf->buf); > - g_free(f->current_buf->iov); > - g_free(f->current_buf->may_free); > - g_free(f->current_buf); > + > + if (f->buffered_mode) { > + QEMUFileBuffer *fb, *next; > + /* > + * put the current back to the free buffers list > + * to destroy all the buffers in one loop > + */ > + QLIST_INSERT_HEAD(&f->free_buffers, f->current_buf, link); > + > + /* destroy all the buffers */ > + QLIST_FOREACH_SAFE(fb, &f->free_buffers, link, next) { > + QLIST_REMOVE(fb, link); > + /* looks like qemu_vfree pairs with qemu_memalign */ > + qemu_vfree(fb->buf); > + g_free(fb); > + } > + g_free(f->pool); > + } else { > + g_free(f->current_buf->buf); > + g_free(f->current_buf->iov); > + g_free(f->current_buf->may_free); > + g_free(f->current_buf); > + } > + > g_free(f); > trace_qemu_file_fclose(); > return ret; > } > > /* > + * Copy an external buffer to the intenal current buffer. > + */ > +static void copy_buf(QEMUFile *f, const uint8_t *buf, size_t size, > + bool may_free) > +{ > + size_t data_size = size; > + const uint8_t *src_ptr = buf; > + > + assert(f->buffered_mode); > + assert(size <= INT_MAX); > + > + while (data_size > 0) { > + size_t chunk_size; > + > + if (buf_is_full(f)) { > + flush_buffer(f); > + if (qemu_file_get_error(f)) { > + return; > + } > + } > + > + chunk_size = MIN(get_buf_free_size(f), data_size); > + > + memcpy(get_buf_ptr(f), src_ptr, chunk_size); > + > + advance_buf_ptr(f, chunk_size); > + > + src_ptr += chunk_size; > + data_size -= chunk_size; > + f->bytes_xfer += chunk_size; > + } > + > + if (may_free) { > + if (qemu_madvise((void *) buf, size, QEMU_MADV_DONTNEED) < 0) { > + error_report("migrate: madvise DONTNEED failed %p %zd: %s", > + buf, size, strerror(errno)); > + } > + } > +} > + > +/* > * Add buf to iovec. Do flush if iovec is full. > * > * Return values: > @@ -454,6 +730,9 @@ static int add_to_iovec(QEMUFile *f, const uint8_t *buf, > size_t size, > static void add_buf_to_iovec(QEMUFile *f, size_t len) > { > QEMUFileBuffer *fb = f->current_buf; > + > + assert(!f->buffered_mode); > + > if (!add_to_iovec(f, fb->buf + fb->buf_index, len, false)) { > fb->buf_index += len; > if (fb->buf_index == IO_BUF_SIZE) { > @@ -469,8 +748,12 @@ void qemu_put_buffer_async(QEMUFile *f, const uint8_t > *buf, size_t size, > return; > } > > - f->bytes_xfer += size; > - add_to_iovec(f, buf, size, may_free); > + if (f->buffered_mode) { > + copy_buf(f, buf, size, may_free); > + } else { > + f->bytes_xfer += size; > + add_to_iovec(f, buf, size, may_free); > + } > } > > void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, size_t size) > @@ -482,6 +765,11 @@ void qemu_put_buffer(QEMUFile *f, const uint8_t *buf, > size_t size) > return; > } > > + if (f->buffered_mode) { > + copy_buf(f, buf, size, false); > + return; > + } > + > while (size > 0) { > l = IO_BUF_SIZE - fb->buf_index; > if (l > size) { > @@ -506,15 +794,21 @@ void qemu_put_byte(QEMUFile *f, int v) > return; > } > > - fb->buf[fb->buf_index] = v; > - f->bytes_xfer++; > - add_buf_to_iovec(f, 1); > + if (f->buffered_mode) { > + copy_buf(f, (const uint8_t *) &v, 1, false); > + } else { > + fb->buf[fb->buf_index] = v; > + add_buf_to_iovec(f, 1); > + f->bytes_xfer++; > + } > } > > void qemu_file_skip(QEMUFile *f, int size) > { > QEMUFileBuffer *fb = f->current_buf; > > + assert(!f->buffered_mode); > + > if (fb->buf_index + size <= fb->buf_size) { > fb->buf_index += size; > } > @@ -672,10 +966,14 @@ int64_t qemu_ftell_fast(QEMUFile *f) > { > int64_t ret = f->pos; > int i; > - QEMUFileBuffer *fb = f->current_buf; > > - for (i = 0; i < fb->iovcnt; i++) { > - ret += fb->iov[i].iov_len; > + if (f->buffered_mode) { > + ret += get_buf_used_size(f); > + } else { > + QEMUFileBuffer *fb = f->current_buf; > + for (i = 0; i < fb->iovcnt; i++) { > + ret += fb->iov[i].iov_len; > + } > } > > return ret; > @@ -683,8 +981,12 @@ int64_t qemu_ftell_fast(QEMUFile *f) > > int64_t qemu_ftell(QEMUFile *f) > { > - qemu_fflush(f); > - return f->pos; > + if (f->buffered_mode) { > + return qemu_ftell_fast(f); > + } else { > + qemu_fflush(f); > + return f->pos; > + } > } > > int qemu_file_rate_limit(QEMUFile *f) > @@ -803,6 +1105,8 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream > *stream, > QEMUFileBuffer *fb = f->current_buf; > ssize_t blen = IO_BUF_SIZE - fb->buf_index - sizeof(int32_t); > > + assert(!f->buffered_mode); > + > if (blen < compressBound(size)) { > return -1; > } > @@ -827,6 +1131,9 @@ int qemu_put_qemu_file(QEMUFile *f_des, QEMUFile *f_src) > int len = 0; > QEMUFileBuffer *fb_src = f_src->current_buf; > > + assert(!f_des->buffered_mode); > + assert(!f_src->buffered_mode); > + > if (fb_src->buf_index > 0) { > len = fb_src->buf_index; > qemu_put_buffer(f_des, fb_src->buf, fb_src->buf_index); > diff --git a/migration/qemu-file.h b/migration/qemu-file.h > index a9b6d6c..08655d2 100644 > --- a/migration/qemu-file.h > +++ b/migration/qemu-file.h > @@ -103,6 +103,14 @@ typedef QEMUFile *(QEMURetPathFunc)(void *opaque); > typedef int (QEMUFileShutdownFunc)(void *opaque, bool rd, bool wr, > Error **errp); > > +/* > + * Enables or disables the buffered mode > + * Existing blocking reads/writes must be woken > + * Returns true if the buffered mode has to be enabled, > + * false if it has to be disabled. > + */ > +typedef bool (QEMUFileEnableBufferedFunc)(void *opaque); > + > typedef struct QEMUFileOps { > QEMUFileGetBufferFunc *get_buffer; > QEMUFileCloseFunc *close; > @@ -110,6 +118,7 @@ typedef struct QEMUFileOps { > QEMUFileWritevBufferFunc *writev_buffer; > QEMURetPathFunc *get_return_path; > QEMUFileShutdownFunc *shut_down; > + QEMUFileEnableBufferedFunc *enable_buffered; > } QEMUFileOps; > > typedef struct QEMUFileHooks { > -- > 1.8.3.1 > -- Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK