The receiver does migration loop until the migration connection is lost. Then, it is started as a backup.
The receiver does not load vm state once the migration begins. Instead, it perfetches one whole migration data into a buffer, then loads vm state from that buffer afterwards. Signed-off-by: Jules Wang <junqing.w...@cs2c.com.cn> --- include/migration/qemu-file.h | 1 + include/sysemu/sysemu.h | 2 + migration.c | 22 ++++-- savevm.c | 158 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 173 insertions(+), 10 deletions(-) diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h index 0f757fb..f01ff10 100644 --- a/include/migration/qemu-file.h +++ b/include/migration/qemu-file.h @@ -92,6 +92,7 @@ typedef struct QEMUFileOps { QEMURamHookFunc *after_ram_iterate; QEMURamHookFunc *hook_ram_load; QEMURamSaveFunc *save_page; + QEMUFileGetBufferFunc *get_prefetch_buffer; } QEMUFileOps; QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops); diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h index 31d5e3f..e94193c 100644 --- a/include/sysemu/sysemu.h +++ b/include/sysemu/sysemu.h @@ -87,6 +87,8 @@ void qemu_savevm_state_complete(QEMUFile *f, void qemu_savevm_state_cancel(void); uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size); int qemu_loadvm_state(QEMUFile *f); +int qemu_loadvm_state_ft(QEMUFile *f); +bool is_ft_migration(QEMUFile *f); /* SLIRP */ void do_info_slirp(Monitor *mon); diff --git a/migration.c b/migration.c index 28acd05..e0734a7 100644 --- a/migration.c +++ b/migration.c @@ -19,6 +19,7 @@ #include "monitor/monitor.h" #include "migration/qemu-file.h" #include "sysemu/sysemu.h" +#include "sysemu/cpus.h" #include "block/block.h" #include "qemu/sockets.h" #include "migration/block.h" @@ -101,13 +102,24 @@ static void process_incoming_migration_co(void *opaque) { QEMUFile *f = opaque; int ret; + int count = 0; - ret = qemu_loadvm_state(f); - qemu_fclose(f); - if (ret < 0) { - fprintf(stderr, "load of migration failed\n"); - exit(EXIT_FAILURE); + if (is_ft_migration(f)) { + while (qemu_loadvm_state_ft(f) >= 0) { + count++; + DPRINTF("incoming count %d\r", count); + } + qemu_fclose(f); + DPRINTF("ft connection lost, launching self..\n"); + } else { + ret = qemu_loadvm_state(f); + qemu_fclose(f); + if (ret < 0) { + fprintf(stderr, "load of migration failed\n"); + exit(EXIT_FAILURE); + } } + cpu_synchronize_all_post_init(); qemu_announce_self(); DPRINTF("successfully loaded vm state\n"); diff --git a/savevm.c b/savevm.c index e75d5d4..611fda2 100644 --- a/savevm.c +++ b/savevm.c @@ -52,6 +52,8 @@ #define ARP_PTYPE_IP 0x0800 #define ARP_OP_REQUEST_REV 0x3 +#define PREFETCH_BUFFER_SIZE 0x010000 + static int announce_self_create(uint8_t *buf, uint8_t *mac_addr) { @@ -135,6 +137,10 @@ struct QEMUFile { unsigned int iovcnt; int last_error; + + uint8_t *prefetch_buf; + uint64_t prefetch_buf_index; + uint64_t prefetch_buf_size; }; typedef struct QEMUFileStdio @@ -193,6 +199,25 @@ static int socket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size) return len; } +static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf, + int64_t pos, int size) +{ + QEMUFile *f = opaque; + + if (f->prefetch_buf_size - pos <= 0) { + return 0; + } + + if (f->prefetch_buf_size - pos < size) { + size = f->prefetch_buf_size - pos; + } + + memcpy(buf, f->prefetch_buf + pos, size); + + return size; +} + + static int socket_close(void *opaque) { QEMUFileSocket *s = opaque; @@ -440,6 +465,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode) static const QEMUFileOps socket_read_ops = { .get_fd = socket_get_fd, .get_buffer = socket_get_buffer, + .get_prefetch_buffer = socket_get_prefetch_buffer, .close = socket_close }; @@ -746,6 +772,8 @@ int qemu_fclose(QEMUFile *f) if (f->last_error) { ret = f->last_error; } + + g_free(f->prefetch_buf); g_free(f); return ret; } @@ -829,6 +857,14 @@ void qemu_put_byte(QEMUFile *f, int v) static void qemu_file_skip(QEMUFile *f, int size) { + if (f->prefetch_buf_index + size <= f->prefetch_buf_size) { + f->prefetch_buf_index += size; + return; + } else { + size -= f->prefetch_buf_size - f->prefetch_buf_index; + f->prefetch_buf_index = f->prefetch_buf_size; + } + if (f->buf_index + size <= f->buf_size) { f->buf_index += size; } @@ -838,6 +874,23 @@ static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset) { int pending; int index; + int done; + + if (f->ops->get_prefetch_buffer) { + if (f->prefetch_buf_index + offset < f->prefetch_buf_size) { + done = f->ops->get_prefetch_buffer(f, + buf, + f->prefetch_buf_index + offset, + size); + if (done == size) { + return size; + } + size -= done; + buf += done; + } else { + offset -= f->prefetch_buf_size - f->prefetch_buf_index; + } + } assert(!qemu_file_is_writable(f)); @@ -882,7 +935,15 @@ int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size) static int qemu_peek_byte(QEMUFile *f, int offset) { - int index = f->buf_index + offset; + int index; + + if (f->prefetch_buf_index + offset < f->prefetch_buf_size) { + return f->prefetch_buf[f->prefetch_buf_index + offset]; + } else { + offset -= f->prefetch_buf_size - f->prefetch_buf_index; + } + + index = f->buf_index + offset; assert(!qemu_file_is_writable(f)); @@ -896,6 +957,16 @@ static int qemu_peek_byte(QEMUFile *f, int offset) return f->buf[index]; } +static unsigned int qemu_peek_be32(QEMUFile *f, int offset) +{ + unsigned int v; + v = qemu_peek_byte(f, offset) << 24; + v |= qemu_peek_byte(f, offset + 1) << 16; + v |= qemu_peek_byte(f, offset + 2) << 8; + v |= qemu_peek_byte(f, offset + 3); + return v; +} + int qemu_get_byte(QEMUFile *f) { int result; @@ -983,7 +1054,6 @@ uint64_t qemu_get_be64(QEMUFile *f) return v; } - /* timer */ void timer_put(QEMUFile *f, QEMUTimer *ts) @@ -2200,6 +2270,11 @@ static void vmstate_subsection_save(QEMUFile *f, const VMStateDescription *vmsd, } } +bool is_ft_migration(QEMUFile *f) +{ + return (qemu_peek_be32(f, 0) == QEMU_VM_FILE_MAGIC_FT); +} + typedef struct LoadStateEntry { QLIST_ENTRY(LoadStateEntry) entry; SaveStateEntry *se; @@ -2221,8 +2296,9 @@ int qemu_loadvm_state(QEMUFile *f) } v = qemu_get_be32(f); - if (v != QEMU_VM_FILE_MAGIC) + if (v != QEMU_VM_FILE_MAGIC && v != QEMU_VM_FILE_MAGIC_FT) { return -EINVAL; + } v = qemu_get_be32(f); if (v == QEMU_VM_FILE_VERSION_COMPAT) { @@ -2309,8 +2385,6 @@ int qemu_loadvm_state(QEMUFile *f) } } - cpu_synchronize_all_post_init(); - ret = 0; out: @@ -2326,6 +2400,79 @@ out: return ret; } +int qemu_loadvm_state_ft(QEMUFile *f) +{ + int ret = 0; + int i = 0; + int done = 0; + uint64_t size = 0; + uint64_t offset = 0; + uint8_t *prefetch_buf = NULL; + uint8_t *buf = NULL; + + uint64_t max_mem = last_ram_offset() * 1.5; + uint64_t eof = htobe64((uint64_t)QEMU_VM_EOF_MAGIC << 32 | + QEMU_VM_FILE_MAGIC_FT); + + if (!f->ops->get_prefetch_buffer) { + fprintf(stderr, "Fault tolerant is not supported by this protocol.\n"); + return -EINVAL; + } + + size = PREFETCH_BUFFER_SIZE; + prefetch_buf = g_malloc(size); + + while (true) { + if (offset + TARGET_PAGE_SIZE >= size) { + if (size*2 > max_mem) { + fprintf(stderr, "qemu_loadvm_state_ft: warning:" \ + "Prefetch buffer becomes too large.\n" \ + "Fault tolerant is unstable when you see this,\n" \ + "please increase the bandwidth or increase " \ + "the max down time.\n"); + break; + } + size = size * 2; + buf = g_try_realloc(prefetch_buf, size); + if (!buf) { + error_report("qemu_loadvm_state_ft: out of memory.\n"); + g_free(prefetch_buf); + return -ENOMEM; + } + + prefetch_buf = buf; + } + + done = qemu_get_buffer(f, prefetch_buf + offset, TARGET_PAGE_SIZE); + + ret = qemu_file_get_error(f); + if (ret != 0) { + g_free(prefetch_buf); + return ret; + } + + buf = prefetch_buf + offset; + offset += done; + for (i = -7; i < done; i++) { + if (memcmp(buf + i, &eof, 8) == 0) { + goto out; + } + } + } + out: + g_free(f->prefetch_buf); + f->prefetch_buf_size = offset; + f->prefetch_buf_index = 0; + f->prefetch_buf = prefetch_buf; + + ret = qemu_loadvm_state(f); + + /* Skip magic number */ + qemu_get_be32(f); + + return ret; +} + static BlockDriverState *find_vmstate_bs(void) { BlockDriverState *bs = NULL; @@ -2437,6 +2584,7 @@ void do_savevm(Monitor *mon, const QDict *qdict) goto the_end; } ret = qemu_savevm_state(f); + cpu_synchronize_all_post_init(); vm_state_size = qemu_ftell(f); qemu_fclose(f); if (ret < 0) { -- 1.8.0.1