Add the MigrationCompressOps and MigrationDecompressOps structures to make the compression method configurable for multi-thread compression migration.
Signed-off-by: Zeyu Jin <jinz...@huawei.com> Signed-off-by: Ying Fang <fangyi...@huawei.com --- migration/migration.c | 9 ++ migration/migration.h | 1 + migration/ram.c | 273 +++++++++++++++++++++++++++++------------- 3 files changed, 203 insertions(+), 80 deletions(-) diff --git a/migration/migration.c b/migration/migration.c index bfbe48cc74..c11f137a8d 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -2356,6 +2356,15 @@ int migrate_decompress_threads(void) return s->parameters.decompress_threads; } +CompressMethod migrate_compress_method(void) +{ + MigrationState *s; + + s = migrate_get_current(); + + return s->parameters.compress_method; +} + bool migrate_dirty_bitmaps(void) { MigrationState *s; diff --git a/migration/migration.h b/migration/migration.h index d096b77f74..e22b2ef840 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -339,6 +339,7 @@ int migrate_compress_level(void); int migrate_compress_threads(void); int migrate_compress_wait_thread(void); int migrate_decompress_threads(void); +CompressMethod migrate_compress_method(void); bool migrate_use_events(void); bool migrate_postcopy_blocktime(void); diff --git a/migration/ram.c b/migration/ram.c index 1818a56314..6f7fab7d4f 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -419,8 +419,11 @@ struct CompressParam { ram_addr_t offset; /* internally used fields */ - z_stream stream; uint8_t *originbuf; + + /* for zlib compression */ + z_stream stream; + }; typedef struct CompressParam CompressParam; @@ -432,12 +435,29 @@ struct DecompressParam { void *des; uint8_t *compbuf; int len; + + /* for zlib compression */ z_stream stream; }; typedef struct DecompressParam DecompressParam; +typedef struct { + int (*save_setup)(CompressParam *param); + void (*save_cleanup)(CompressParam *param); + ssize_t (*compress_data)(CompressParam *param, size_t size); +} MigrationCompressOps; + +typedef struct { + int (*load_setup)(DecompressParam *param); + void (*load_cleanup)(DecompressParam *param); + int (*decompress_data)(DecompressParam *param, uint8_t *dest, size_t size); + int (*check_len)(int len); +} MigrationDecompressOps; + static CompressParam *comp_param; static QemuThread *compress_threads; +static MigrationCompressOps *compress_ops; +static MigrationDecompressOps *decompress_ops; /* comp_done_cond is used to wake up the migration thread when * one of the compression threads has finished the compression. * comp_done_lock is used to co-work with comp_done_cond. @@ -455,6 +475,157 @@ static QemuCond decomp_done_cond; static bool do_compress_ram_page(CompressParam *param, RAMBlock *block); +static int zlib_save_setup(CompressParam *param) +{ + if (deflateInit(¶m->stream, + migrate_compress_level()) != Z_OK) { + return -1; + } + + return 0; +} + +static ssize_t zlib_compress_data(CompressParam *param, size_t size) +{ + int err; + uint8_t *dest = NULL; + z_stream *stream = ¶m->stream; + uint8_t *p = param->originbuf; + QEMUFile *f = f = param->file; + ssize_t blen = qemu_put_compress_start(f, &dest); + + if (blen < compressBound(size)) { + return -1; + } + + err = deflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = size; + stream->next_in = p; + stream->avail_out = blen; + stream->next_out = dest; + + err = deflate(stream, Z_FINISH); + if (err != Z_STREAM_END) { + return -1; + } + + blen = stream->next_out - dest; + if (blen < 0) { + return -1; + } + + qemu_put_compress_end(f, blen); + return blen + sizeof(int32_t); +} + +static void zlib_save_cleanup(CompressParam *param) +{ + deflateEnd(¶m->stream); +} + +static int zlib_load_setup(DecompressParam *param) +{ + if (inflateInit(¶m->stream) != Z_OK) { + return -1; + } + + return 0; +} + +static int +zlib_decompress_data(DecompressParam *param, uint8_t *dest, size_t size) +{ + int err; + + z_stream *stream = ¶m->stream; + + err = inflateReset(stream); + if (err != Z_OK) { + return -1; + } + + stream->avail_in = param->len; + stream->next_in = param->compbuf; + stream->avail_out = size; + stream->next_out = dest; + + err = inflate(stream, Z_NO_FLUSH); + if (err != Z_STREAM_END) { + return -1; + } + + return stream->total_out; +} + +static void zlib_load_cleanup(DecompressParam *param) +{ + inflateEnd(¶m->stream); +} + +static int zlib_check_len(int len) +{ + return len < 0 || len > compressBound(TARGET_PAGE_SIZE); +} + +static int set_compress_ops(void) +{ + compress_ops = g_new0(MigrationCompressOps, 1); + + switch (migrate_compress_method()) { + case COMPRESS_METHOD_ZLIB: + compress_ops->save_setup = zlib_save_setup; + compress_ops->save_cleanup = zlib_save_cleanup; + compress_ops->compress_data = zlib_compress_data; + break; + default: + return -1; + } + + return 0; +} + +static int set_decompress_ops(void) +{ + decompress_ops = g_new0(MigrationDecompressOps, 1); + + switch (migrate_compress_method()) { + case COMPRESS_METHOD_ZLIB: + decompress_ops->load_setup = zlib_load_setup; + decompress_ops->load_cleanup = zlib_load_cleanup; + decompress_ops->decompress_data = zlib_decompress_data; + decompress_ops->check_len = zlib_check_len; + break; + default: + return -1; + } + + return 0; +} + +static void clean_compress_ops(void) +{ + compress_ops->save_setup = NULL; + compress_ops->save_cleanup = NULL; + compress_ops->compress_data = NULL; + + g_free(compress_ops); + compress_ops = NULL; +} + +static void clean_decompress_ops(void) +{ + decompress_ops->load_setup = NULL; + decompress_ops->load_cleanup = NULL; + decompress_ops->decompress_data = NULL; + + g_free(decompress_ops); + decompress_ops = NULL; +} + static void *do_data_compress(void *opaque) { CompressParam *param = opaque; @@ -511,7 +682,7 @@ static void compress_threads_save_cleanup(void) qemu_thread_join(compress_threads + i); qemu_mutex_destroy(&comp_param[i].mutex); qemu_cond_destroy(&comp_param[i].cond); - deflateEnd(&comp_param[i].stream); + compress_ops->save_cleanup(&comp_param[i]); g_free(comp_param[i].originbuf); qemu_fclose(comp_param[i].file); comp_param[i].file = NULL; @@ -522,6 +693,7 @@ static void compress_threads_save_cleanup(void) g_free(comp_param); compress_threads = NULL; comp_param = NULL; + clean_compress_ops(); } static int compress_threads_save_setup(void) @@ -531,6 +703,12 @@ static int compress_threads_save_setup(void) if (!migrate_use_compression()) { return 0; } + + if (set_compress_ops() < 0) { + clean_compress_ops(); + return -1; + } + thread_count = migrate_compress_threads(); compress_threads = g_new0(QemuThread, thread_count); comp_param = g_new0(CompressParam, thread_count); @@ -542,8 +720,7 @@ static int compress_threads_save_setup(void) goto exit; } - if (deflateInit(&comp_param[i].stream, - migrate_compress_level()) != Z_OK) { + if (compress_ops->save_setup(&comp_param[i]) < 0) { g_free(comp_param[i].originbuf); goto exit; } @@ -1209,50 +1386,6 @@ static int ram_save_multifd_page(RAMState *rs, RAMBlock *block, return 1; } -/* - * Compress size bytes of data start at p and store the compressed - * data to the buffer of f. - * - * Since the file is dummy file with empty_ops, return -1 if f has no space to - * save the compressed data. - */ -static ssize_t qemu_put_compression_data(CompressParam *param, size_t size) -{ - int err; - uint8_t *dest = NULL; - z_stream *stream = ¶m->stream; - uint8_t *p = param->originbuf; - QEMUFile *f = f = param->file; - ssize_t blen = qemu_put_compress_start(f, &dest); - - if (blen < compressBound(size)) { - return -1; - } - - err = deflateReset(stream); - if (err != Z_OK) { - return -1; - } - - stream->avail_in = size; - stream->next_in = p; - stream->avail_out = blen; - stream->next_out = dest; - - err = deflate(stream, Z_FINISH); - if (err != Z_STREAM_END) { - return -1; - } - - blen = stream->next_out - dest; - if (blen < 0) { - return -1; - } - - qemu_put_compress_end(f, blen); - return blen + sizeof(int32_t); -} - static bool do_compress_ram_page(CompressParam *param, RAMBlock *block) { RAMState *rs = ram_state; @@ -1275,7 +1408,7 @@ static bool do_compress_ram_page(CompressParam *param, RAMBlock *block) * decompression */ memcpy(param->originbuf, p, TARGET_PAGE_SIZE); - ret = qemu_put_compression_data(param, TARGET_PAGE_SIZE); + ret = compress_ops->compress_data(param, TARGET_PAGE_SIZE); if (ret < 0) { qemu_file_set_error(migrate_get_current()->to_dst_file, ret); error_report("compressed data failed!"); @@ -2864,32 +2997,6 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size) } } -/* return the size after decompression, or negative value on error */ -static int -qemu_uncompress_data(DecompressParam *param, uint8_t *dest, size_t pagesize) -{ - int err; - - z_stream *stream = ¶m->stream; - - err = inflateReset(stream); - if (err != Z_OK) { - return -1; - } - - stream->avail_in = param->len; - stream->next_in = param->compbuf; - stream->avail_out = pagesize; - stream->next_out = dest; - - err = inflate(stream, Z_NO_FLUSH); - if (err != Z_STREAM_END) { - return -1; - } - - return stream->total_out; -} - static void *do_data_decompress(void *opaque) { DecompressParam *param = opaque; @@ -2903,7 +3010,7 @@ static void *do_data_decompress(void *opaque) param->des = 0; qemu_mutex_unlock(¶m->mutex); - ret = qemu_uncompress_data(param, des, TARGET_PAGE_SIZE); + ret = decompress_ops->decompress_data(param, des, TARGET_PAGE_SIZE); if (ret < 0 && migrate_get_current()->decompress_error_check) { error_report("decompress data failed"); qemu_file_set_error(decomp_file, ret); @@ -2973,7 +3080,7 @@ static void compress_threads_load_cleanup(void) qemu_thread_join(decompress_threads + i); qemu_mutex_destroy(&decomp_param[i].mutex); qemu_cond_destroy(&decomp_param[i].cond); - inflateEnd(&decomp_param[i].stream); + decompress_ops->load_cleanup(&decomp_param[i]); g_free(decomp_param[i].compbuf); decomp_param[i].compbuf = NULL; } @@ -2982,6 +3089,7 @@ static void compress_threads_load_cleanup(void) decompress_threads = NULL; decomp_param = NULL; decomp_file = NULL; + clean_decompress_ops(); } static int compress_threads_load_setup(QEMUFile *f) @@ -2992,6 +3100,11 @@ static int compress_threads_load_setup(QEMUFile *f) return 0; } + if (set_decompress_ops() < 0) { + clean_decompress_ops(); + return -1; + } + thread_count = migrate_decompress_threads(); decompress_threads = g_new0(QemuThread, thread_count); decomp_param = g_new0(DecompressParam, thread_count); @@ -2999,7 +3112,7 @@ static int compress_threads_load_setup(QEMUFile *f) qemu_cond_init(&decomp_done_cond); decomp_file = f; for (i = 0; i < thread_count; i++) { - if (inflateInit(&decomp_param[i].stream) != Z_OK) { + if (decompress_ops->load_setup(&decomp_param[i]) < 0) { goto exit; } @@ -3335,7 +3448,7 @@ static int ram_load_postcopy(QEMUFile *f) case RAM_SAVE_FLAG_COMPRESS_PAGE: all_zero = false; len = qemu_get_be32(f); - if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) { + if (decompress_ops->check_len(len)) { error_report("Invalid compressed data length: %d", len); ret = -EINVAL; break; @@ -3602,7 +3715,7 @@ static int ram_load_precopy(QEMUFile *f) case RAM_SAVE_FLAG_COMPRESS_PAGE: len = qemu_get_be32(f); - if (len < 0 || len > compressBound(TARGET_PAGE_SIZE)) { + if (decompress_ops->check_len(len)) { error_report("Invalid compressed data length: %d", len); ret = -EINVAL; break; -- 2.27.0