Re: [PATCH v2 11/13] ram.c: Move core decompression code into its own file

2023-04-28 Thread Juan Quintela
Lukas Straub  wrote:
> No functional changes intended.
>
> Signed-off-by: Lukas Straub 
> Reviewed-by: Philippe Mathieu-Daudé 

Reviewed-by: Juan Quintela 




[PATCH v2 11/13] ram.c: Move core decompression code into its own file

2023-04-20 Thread Lukas Straub
No functional changes intended.

Signed-off-by: Lukas Straub 
Reviewed-by: Philippe Mathieu-Daudé 
---
 migration/ram-compress.c | 203 ++
 migration/ram-compress.h |   5 +
 migration/ram.c  | 204 ---
 3 files changed, 208 insertions(+), 204 deletions(-)

diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index 77902a1d65..f75b8c3079 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -47,6 +47,24 @@ static QemuThread *compress_threads;
 static QemuMutex comp_done_lock;
 static QemuCond comp_done_cond;

+struct DecompressParam {
+bool done;
+bool quit;
+QemuMutex mutex;
+QemuCond cond;
+void *des;
+uint8_t *compbuf;
+int len;
+z_stream stream;
+};
+typedef struct DecompressParam DecompressParam;
+
+static QEMUFile *decomp_file;
+static DecompressParam *decomp_param;
+static QemuThread *decompress_threads;
+static QemuMutex decomp_done_lock;
+static QemuCond decomp_done_cond;
+
 static CompressResult do_compress_ram_page(QEMUFile *f, z_stream *stream,
RAMBlock *block, ram_addr_t offset,
uint8_t *source_buf);
@@ -271,3 +289,188 @@ retry:

 return pages;
 }
+
+/* return the size after decompression, or negative value on error */
+static int
+qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+ const uint8_t *source, size_t source_len)
+{
+int err;
+
+err = inflateReset(stream);
+if (err != Z_OK) {
+return -1;
+}
+
+stream->avail_in = source_len;
+stream->next_in = (uint8_t *)source;
+stream->avail_out = dest_len;
+stream->next_out = dest;
+
+err = inflate(stream, Z_NO_FLUSH);
+if (err != Z_STREAM_END) {
+return -1;
+}
+
+return stream->total_out;
+}
+
+static void *do_data_decompress(void *opaque)
+{
+DecompressParam *param = opaque;
+unsigned long pagesize;
+uint8_t *des;
+int len, ret;
+
+qemu_mutex_lock(>mutex);
+while (!param->quit) {
+if (param->des) {
+des = param->des;
+len = param->len;
+param->des = 0;
+qemu_mutex_unlock(>mutex);
+
+pagesize = TARGET_PAGE_SIZE;
+
+ret = qemu_uncompress_data(>stream, des, pagesize,
+   param->compbuf, len);
+if (ret < 0 && migrate_get_current()->decompress_error_check) {
+error_report("decompress data failed");
+qemu_file_set_error(decomp_file, ret);
+}
+
+qemu_mutex_lock(_done_lock);
+param->done = true;
+qemu_cond_signal(_done_cond);
+qemu_mutex_unlock(_done_lock);
+
+qemu_mutex_lock(>mutex);
+} else {
+qemu_cond_wait(>cond, >mutex);
+}
+}
+qemu_mutex_unlock(>mutex);
+
+return NULL;
+}
+
+int wait_for_decompress_done(void)
+{
+int idx, thread_count;
+
+if (!migrate_use_compression()) {
+return 0;
+}
+
+thread_count = migrate_decompress_threads();
+qemu_mutex_lock(_done_lock);
+for (idx = 0; idx < thread_count; idx++) {
+while (!decomp_param[idx].done) {
+qemu_cond_wait(_done_cond, _done_lock);
+}
+}
+qemu_mutex_unlock(_done_lock);
+return qemu_file_get_error(decomp_file);
+}
+
+void compress_threads_load_cleanup(void)
+{
+int i, thread_count;
+
+if (!migrate_use_compression()) {
+return;
+}
+thread_count = migrate_decompress_threads();
+for (i = 0; i < thread_count; i++) {
+/*
+ * we use it as a indicator which shows if the thread is
+ * properly init'd or not
+ */
+if (!decomp_param[i].compbuf) {
+break;
+}
+
+qemu_mutex_lock(_param[i].mutex);
+decomp_param[i].quit = true;
+qemu_cond_signal(_param[i].cond);
+qemu_mutex_unlock(_param[i].mutex);
+}
+for (i = 0; i < thread_count; i++) {
+if (!decomp_param[i].compbuf) {
+break;
+}
+
+qemu_thread_join(decompress_threads + i);
+qemu_mutex_destroy(_param[i].mutex);
+qemu_cond_destroy(_param[i].cond);
+inflateEnd(_param[i].stream);
+g_free(decomp_param[i].compbuf);
+decomp_param[i].compbuf = NULL;
+}
+g_free(decompress_threads);
+g_free(decomp_param);
+decompress_threads = NULL;
+decomp_param = NULL;
+decomp_file = NULL;
+}
+
+int compress_threads_load_setup(QEMUFile *f)
+{
+int i, thread_count;
+
+if (!migrate_use_compression()) {
+return 0;
+}
+
+thread_count = migrate_decompress_threads();
+decompress_threads = g_new0(QemuThread, thread_count);
+decomp_param = g_new0(DecompressParam, thread_count);
+qemu_mutex_init(_done_lock);
+qemu_cond_init(_done_cond);
+