Il 18/10/2012 09:30, Juan Quintela ha scritto: > This only moves the code (also from buffered_file.h to migration.h). > Fix whitespace until checkpatch is happy.
While I agree with this patch, it is also a conflict magnet. My migration-in-a-coroutine cleanups will touch buffered_file.c, you're warned... Paolo > Signed-off-by: Juan Quintela <quint...@redhat.com> > --- > Makefile.objs | 2 +- > buffered_file.c | 244 > -------------------------------------------------------- > buffered_file.h | 22 ----- > migration.c | 218 +++++++++++++++++++++++++++++++++++++++++++++++++- > migration.h | 1 + > 5 files changed, 219 insertions(+), 268 deletions(-) > delete mode 100644 buffered_file.c > delete mode 100644 buffered_file.h > > diff --git a/Makefile.objs b/Makefile.objs > index 74b3542..3de8f27 100644 > --- a/Makefile.objs > +++ b/Makefile.objs > @@ -73,7 +73,7 @@ extra-obj-$(CONFIG_LINUX) += fsdev/ > > common-obj-y += tcg-runtime.o host-utils.o main-loop.o > common-obj-y += input.o > -common-obj-y += buffered_file.o migration.o migration-tcp.o > +common-obj-y += migration.o migration-tcp.o > common-obj-y += qemu-char.o #aio.o > common-obj-y += block-migration.o iohandler.o > common-obj-y += pflib.o > diff --git a/buffered_file.c b/buffered_file.c > deleted file mode 100644 > index c21f847..0000000 > --- a/buffered_file.c > +++ /dev/null > @@ -1,244 +0,0 @@ > -/* > - * QEMU buffered QEMUFile > - * > - * Copyright IBM, Corp. 2008 > - * > - * Authors: > - * Anthony Liguori <aligu...@us.ibm.com> > - * > - * This work is licensed under the terms of the GNU GPL, version 2. See > - * the COPYING file in the top-level directory. > - * > - * Contributions after 2012-01-13 are licensed under the terms of the > - * GNU GPL, version 2 or (at your option) any later version. > - */ > - > -#include "qemu-common.h" > -#include "hw/hw.h" > -#include "qemu-timer.h" > -#include "qemu-char.h" > -#include "buffered_file.h" > -#include "qemu-thread.h" > - > -//#define DEBUG_BUFFERED_FILE > - > -typedef struct QEMUFileBuffered > -{ > - MigrationState *migration_state; > - QEMUFile *file; > - size_t bytes_xfer; > - size_t xfer_limit; > - uint8_t *buffer; > - size_t buffer_size; > - size_t buffer_capacity; > - QemuThread thread; > -} QEMUFileBuffered; > - > -#ifdef DEBUG_BUFFERED_FILE > -#define DPRINTF(fmt, ...) \ > - do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0) > -#else > -#define DPRINTF(fmt, ...) \ > - do { } while (0) > -#endif > - > -static ssize_t buffered_flush(QEMUFileBuffered *s) > -{ > - size_t offset = 0; > - ssize_t ret = 0; > - > - DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size); > - > - while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) { > - > - ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset, > - s->buffer_size - offset); > - if (ret <= 0) { > - DPRINTF("error flushing data, %zd\n", ret); > - break; > - } else { > - DPRINTF("flushed %zd byte(s)\n", ret); > - offset += ret; > - s->bytes_xfer += ret; > - } > - } > - > - DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size); > - memmove(s->buffer, s->buffer + offset, s->buffer_size - offset); > - s->buffer_size -= offset; > - > - if (ret < 0) { > - return ret; > - } > - return offset; > -} > - > -static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t > pos, int size) > -{ > - QEMUFileBuffered *s = opaque; > - ssize_t error; > - > - DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos); > - > - error = qemu_file_get_error(s->file); > - if (error) { > - DPRINTF("flush when error, bailing: %s\n", strerror(-error)); > - return error; > - } > - > - if (size <= 0) { > - return size; > - } > - > - if (size > (s->buffer_capacity - s->buffer_size)) { > - DPRINTF("increasing buffer capacity from %zu by %zu\n", > - s->buffer_capacity, size + 1024); > - > - s->buffer_capacity += size + 1024; > - > - s->buffer = g_realloc(s->buffer, s->buffer_capacity); > - } > - > - memcpy(s->buffer + s->buffer_size, buf, size); > - s->buffer_size += size; > - > - return size; > -} > - > -static int buffered_close(void *opaque) > -{ > - QEMUFileBuffered *s = opaque; > - ssize_t ret = 0; > - int ret2; > - > - DPRINTF("closing\n"); > - > - s->xfer_limit = INT_MAX; > - while (!qemu_file_get_error(s->file) && s->buffer_size) { > - ret = buffered_flush(s); > - if (ret < 0) { > - break; > - } > - } > - > - ret2 = migrate_fd_close(s->migration_state); > - if (ret >= 0) { > - ret = ret2; > - } > - ret = migrate_fd_close(s->migration_state); > - s->migration_state->complete = true; > - return ret; > -} > - > -/* > - * The meaning of the return values is: > - * 0: We can continue sending > - * 1: Time to stop > - * negative: There has been an error > - */ > -static int buffered_rate_limit(void *opaque) > -{ > - QEMUFileBuffered *s = opaque; > - int ret; > - > - ret = qemu_file_get_error(s->file); > - if (ret) { > - return ret; > - } > - > - if (s->bytes_xfer > s->xfer_limit) > - return 1; > - > - return 0; > -} > - > -static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate) > -{ > - QEMUFileBuffered *s = opaque; > - if (qemu_file_get_error(s->file)) { > - goto out; > - } > - if (new_rate > SIZE_MAX) { > - new_rate = SIZE_MAX; > - } > - > - s->xfer_limit = new_rate / 10; > - > -out: > - return s->xfer_limit; > -} > - > -static int64_t buffered_get_rate_limit(void *opaque) > -{ > - QEMUFileBuffered *s = opaque; > - > - return s->xfer_limit; > -} > - > -/* 100ms xfer_limit is the limit that we should write each 100ms */ > -#define BUFFER_DELAY 100 > - > -static void *buffered_file_thread(void *opaque) > -{ > - QEMUFileBuffered *s = opaque; > - int64_t initial_time = qemu_get_clock_ms(rt_clock); > - int64_t max_size = 0; > - bool last_round = false; > - > - while (true) { > - int64_t current_time = qemu_get_clock_ms(rt_clock); > - > - if (s->migration_state->complete) { > - break; > - } > - if (current_time >= initial_time + BUFFER_DELAY) { > - uint64_t transferred_bytes = s->bytes_xfer; > - uint64_t time_spent = current_time - initial_time; > - double bandwidth = transferred_bytes / time_spent; > - max_size = bandwidth * migrate_max_downtime() / 1000000; > - > - DPRINTF("transferred %" PRIu64 " time_spent %" PRIu64 > - " bandwidth %g max_size %" PRId64 "\n", > - transferred_bytes, time_spent, bandwidth, max_size); > - > - s->bytes_xfer = 0; > - initial_time = current_time; > - } > - if (!last_round && (s->bytes_xfer >= s->xfer_limit)) { > - /* usleep expects microseconds */ > - usleep((initial_time + BUFFER_DELAY - current_time)*1000); > - } > - buffered_flush(s); > - > - DPRINTF("file is ready\n"); > - if (s->bytes_xfer < s->xfer_limit) { > - DPRINTF("notifying client\n"); > - last_round = migrate_fd_put_ready(s->migration_state, max_size); > - } > - } > - > - g_free(s->buffer); > - g_free(s); > - return NULL; > -} > - > -void qemu_fopen_ops_buffered(MigrationState *migration_state) > -{ > - QEMUFileBuffered *s; > - > - s = g_malloc0(sizeof(*s)); > - > - s->migration_state = migration_state; > - s->xfer_limit = migration_state->bandwidth_limit / 10; > - s->migration_state->complete = false; > - > - s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, > - buffered_close, buffered_rate_limit, > - buffered_set_rate_limit, > - buffered_get_rate_limit); > - > - migration_state->file = s->file; > - > - qemu_thread_create(&s->thread, buffered_file_thread, s, > - QEMU_THREAD_DETACHED); > -} > diff --git a/buffered_file.h b/buffered_file.h > deleted file mode 100644 > index 8a246fd..0000000 > --- a/buffered_file.h > +++ /dev/null > @@ -1,22 +0,0 @@ > -/* > - * QEMU buffered QEMUFile > - * > - * Copyright IBM, Corp. 2008 > - * > - * Authors: > - * Anthony Liguori <aligu...@us.ibm.com> > - * > - * This work is licensed under the terms of the GNU GPL, version 2. See > - * the COPYING file in the top-level directory. > - * > - */ > - > -#ifndef QEMU_BUFFERED_FILE_H > -#define QEMU_BUFFERED_FILE_H > - > -#include "hw/hw.h" > -#include "migration.h" > - > -void qemu_fopen_ops_buffered(MigrationState *migration_state); > - > -#endif > diff --git a/migration.c b/migration.c > index 8054a77..b1567f3 100644 > --- a/migration.c > +++ b/migration.c > @@ -16,7 +16,7 @@ > #include "qemu-common.h" > #include "migration.h" > #include "monitor.h" > -#include "buffered_file.h" > +#include "qemu-file.h" > #include "sysemu.h" > #include "block.h" > #include "qemu_socket.h" > @@ -569,3 +569,219 @@ int64_t migrate_xbzrle_cache_size(void) > > return s->xbzrle_cache_size; > } > + > +/* migration thread support */ > + > +typedef struct QEMUFileBuffered { > + MigrationState *migration_state; > + QEMUFile *file; > + size_t bytes_xfer; > + size_t xfer_limit; > + uint8_t *buffer; > + size_t buffer_size; > + size_t buffer_capacity; > + QemuThread thread; > +} QEMUFileBuffered; > + > +static ssize_t buffered_flush(QEMUFileBuffered *s) > +{ > + size_t offset = 0; > + ssize_t ret = 0; > + > + DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size); > + > + while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) { > + > + ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset, > + s->buffer_size - offset); > + if (ret <= 0) { > + DPRINTF("error flushing data, %zd\n", ret); > + break; > + } else { > + DPRINTF("flushed %zd byte(s)\n", ret); > + offset += ret; > + s->bytes_xfer += ret; > + } > + } > + > + DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size); > + memmove(s->buffer, s->buffer + offset, s->buffer_size - offset); > + s->buffer_size -= offset; > + > + if (ret < 0) { > + return ret; > + } > + return offset; > +} > + > +static int buffered_put_buffer(void *opaque, const uint8_t *buf, > + int64_t pos, int size) > +{ > + QEMUFileBuffered *s = opaque; > + ssize_t error; > + > + DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos); > + > + error = qemu_file_get_error(s->file); > + if (error) { > + DPRINTF("flush when error, bailing: %s\n", strerror(-error)); > + return error; > + } > + > + if (size <= 0) { > + return size; > + } > + > + if (size > (s->buffer_capacity - s->buffer_size)) { > + DPRINTF("increasing buffer capacity from %zu by %zu\n", > + s->buffer_capacity, size + 1024); > + > + s->buffer_capacity += size + 1024; > + > + s->buffer = g_realloc(s->buffer, s->buffer_capacity); > + } > + > + memcpy(s->buffer + s->buffer_size, buf, size); > + s->buffer_size += size; > + > + return size; > +} > + > +static int buffered_close(void *opaque) > +{ > + QEMUFileBuffered *s = opaque; > + ssize_t ret = 0; > + int ret2; > + > + DPRINTF("closing\n"); > + > + s->xfer_limit = INT_MAX; > + while (!qemu_file_get_error(s->file) && s->buffer_size) { > + ret = buffered_flush(s); > + if (ret < 0) { > + break; > + } > + } > + > + ret2 = migrate_fd_close(s->migration_state); > + if (ret >= 0) { > + ret = ret2; > + } > + ret = migrate_fd_close(s->migration_state); > + s->migration_state->complete = true; > + return ret; > +} > + > +/* > + * The meaning of the return values is: > + * 0: We can continue sending > + * 1: Time to stop > + * negative: There has been an error > + */ > +static int buffered_rate_limit(void *opaque) > +{ > + QEMUFileBuffered *s = opaque; > + int ret; > + > + ret = qemu_file_get_error(s->file); > + if (ret) { > + return ret; > + } > + > + if (s->bytes_xfer > s->xfer_limit) { > + return 1; > + } > + > + return 0; > +} > + > +static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate) > +{ > + QEMUFileBuffered *s = opaque; > + if (qemu_file_get_error(s->file)) { > + goto out; > + } > + if (new_rate > SIZE_MAX) { > + new_rate = SIZE_MAX; > + } > + > + s->xfer_limit = new_rate / 10; > + > +out: > + return s->xfer_limit; > +} > + > +static int64_t buffered_get_rate_limit(void *opaque) > +{ > + QEMUFileBuffered *s = opaque; > + > + return s->xfer_limit; > +} > + > +/* 100ms xfer_limit is the limit that we should write each 100ms */ > +#define BUFFER_DELAY 100 > + > +static void *buffered_file_thread(void *opaque) > +{ > + QEMUFileBuffered *s = opaque; > + int64_t initial_time = qemu_get_clock_ms(rt_clock); > + int64_t max_size = 0; > + bool last_round = false; > + > + while (true) { > + int64_t current_time = qemu_get_clock_ms(rt_clock); > + > + if (s->migration_state->complete) { > + break; > + } > + if (current_time >= initial_time + BUFFER_DELAY) { > + uint64_t transferred_bytes = s->bytes_xfer; > + uint64_t time_spent = current_time - initial_time; > + double bandwidth = transferred_bytes / time_spent; > + max_size = bandwidth * migrate_max_downtime() / 1000000; > + > + DPRINTF("transferred %" PRIu64 " time_spent %" PRIu64 > + " bandwidth %g max_size %" PRId64 "\n", > + transferred_bytes, time_spent, bandwidth, max_size); > + > + s->bytes_xfer = 0; > + initial_time = current_time; > + } > + if (!last_round && (s->bytes_xfer >= s->xfer_limit)) { > + /* usleep expects microseconds */ > + usleep((initial_time + BUFFER_DELAY - current_time)*1000); > + } > + buffered_flush(s); > + > + DPRINTF("file is ready\n"); > + if (s->bytes_xfer < s->xfer_limit) { > + DPRINTF("notifying client\n"); > + last_round = migrate_fd_put_ready(s->migration_state, max_size); > + } > + } > + > + g_free(s->buffer); > + g_free(s); > + return NULL; > +} > + > +void qemu_fopen_ops_buffered(MigrationState *migration_state) > +{ > + QEMUFileBuffered *s; > + > + s = g_malloc0(sizeof(*s)); > + > + s->migration_state = migration_state; > + s->xfer_limit = migration_state->bandwidth_limit / 10; > + s->migration_state->complete = false; > + > + s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL, > + buffered_close, buffered_rate_limit, > + buffered_set_rate_limit, > + buffered_get_rate_limit); > + > + migration_state->file = s->file; > + > + qemu_thread_create(&s->thread, buffered_file_thread, s, > + QEMU_THREAD_DETACHED); > +} > diff --git a/migration.h b/migration.h > index 1f2baed..40c8e9c 100644 > --- a/migration.h > +++ b/migration.h > @@ -129,4 +129,5 @@ int64_t migrate_xbzrle_cache_size(void); > > int64_t xbzrle_cache_resize(int64_t new_size); > > +void qemu_fopen_ops_buffered(MigrationState *migration_state); > #endif >