Re: [Qemu-devel] [v3 10/13] migration: Add the core code of multi-thread decompression

2015-01-23 Thread Dr. David Alan Gilbert
* Liang Li (liang.z...@intel.com) wrote:
> Signed-off-by: Liang Li 
> Signed-off-by: Yang Zhang 
> ---
>  arch_init.c | 48 ++--
>  1 file changed, 46 insertions(+), 2 deletions(-)
> 
> diff --git a/arch_init.c b/arch_init.c
> index 14bc486..7103f4f 100644
> --- a/arch_init.c
> +++ b/arch_init.c
> @@ -24,6 +24,7 @@
>  #include 
>  #include 
>  #include 
> +#include 
>  #ifndef _WIN32
>  #include 
>  #include 
> @@ -820,6 +821,14 @@ static inline void start_compression(compress_param 
> *param)
>  qemu_mutex_unlock(¶m->mutex);
>  }
>  
> +static inline void start_decompression(decompress_param *param)
> +{
> +qemu_mutex_lock(¶m->mutex);
> +param->state = START;
> +qemu_cond_signal(¶m->cond);
> +qemu_mutex_unlock(¶m->mutex);
> +}
> +
>  
>  static uint64_t bytes_transferred;
>  
> @@ -1351,8 +1360,24 @@ void ram_handle_compressed(void *host, uint8_t ch, 
> uint64_t size)
>  
>  static void *do_data_decompress(void *opaque)
>  {
> +decompress_param *param = opaque;
>  while (!quit_thread) {
> -/* To be done */
> +qemu_mutex_lock(¶m->mutex);
> +while (param->state != START) {
> +qemu_cond_wait(¶m->cond, ¶m->mutex);
> +if (quit_thread) {
> +break;
> +}
> +size_t pagesize = TARGET_PAGE_SIZE;
> +/* uncompress() will return failed in some case,
> + * especially when the page is dirted when doing
> + * the compression, ignore the return value because
> + * the dirty page will be retransferred. */
> +uncompress((Bytef *)param->des, &pagesize,
> +(const Bytef *)param->compbuf, param->len);

That's kind of a scary comment!  It looks like 'uncompress' is supposed
to be safe, so shouldn't damage any other data; it's worrying me
might not find real problems though.

However,

Reviewed-by: Dr. David Alan Gilbert 

> +param->state = DONE;
> +}
> +qemu_mutex_unlock(¶m->mutex);
>  }
>  return NULL;
>  }
> @@ -1379,6 +1404,9 @@ void migrate_decompress_threads_join(void)
>  quit_thread = true;
>  thread_count = migrate_decompress_threads();
>  for (i = 0; i < thread_count; i++) {
> +qemu_cond_signal(&decomp_param[i].cond);
> +}
> +for (i = 0; i < thread_count; i++) {
>  qemu_thread_join(decompress_threads + i);
>  qemu_mutex_destroy(&decomp_param[i].mutex);
>  qemu_cond_destroy(&decomp_param[i].cond);
> @@ -1392,7 +1420,23 @@ void migrate_decompress_threads_join(void)
>  static void decompress_data_with_multi_threads(uint8_t *compbuf,
>  void *host, int len)
>  {
> -/* To be done */
> +int idx, thread_count;
> +
> +thread_count = migrate_decompress_threads();
> +while (true) {
> +for (idx = 0; idx < thread_count; idx++) {
> +if (decomp_param[idx].state == DONE) {
> +memcpy(decomp_param[idx].compbuf, compbuf, len);
> +decomp_param[idx].des = host;
> +decomp_param[idx].len = len;
> +start_decompression(&decomp_param[idx]);
> +break;
> +}
> +}
> +if (idx < thread_count) {
> +break;
> +}
> +}
>  }
>  
>  static int ram_load(QEMUFile *f, void *opaque, int version_id)
> -- 
> 1.8.3.1
> 
--
Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK



[Qemu-devel] [v3 10/13] migration: Add the core code of multi-thread decompression

2014-12-11 Thread Liang Li
Signed-off-by: Liang Li 
Signed-off-by: Yang Zhang 
---
 arch_init.c | 48 ++--
 1 file changed, 46 insertions(+), 2 deletions(-)

diff --git a/arch_init.c b/arch_init.c
index 14bc486..7103f4f 100644
--- a/arch_init.c
+++ b/arch_init.c
@@ -24,6 +24,7 @@
 #include 
 #include 
 #include 
+#include 
 #ifndef _WIN32
 #include 
 #include 
@@ -820,6 +821,14 @@ static inline void start_compression(compress_param *param)
 qemu_mutex_unlock(¶m->mutex);
 }
 
+static inline void start_decompression(decompress_param *param)
+{
+qemu_mutex_lock(¶m->mutex);
+param->state = START;
+qemu_cond_signal(¶m->cond);
+qemu_mutex_unlock(¶m->mutex);
+}
+
 
 static uint64_t bytes_transferred;
 
@@ -1351,8 +1360,24 @@ void ram_handle_compressed(void *host, uint8_t ch, 
uint64_t size)
 
 static void *do_data_decompress(void *opaque)
 {
+decompress_param *param = opaque;
 while (!quit_thread) {
-/* To be done */
+qemu_mutex_lock(¶m->mutex);
+while (param->state != START) {
+qemu_cond_wait(¶m->cond, ¶m->mutex);
+if (quit_thread) {
+break;
+}
+size_t pagesize = TARGET_PAGE_SIZE;
+/* uncompress() will return failed in some case,
+ * especially when the page is dirted when doing
+ * the compression, ignore the return value because
+ * the dirty page will be retransferred. */
+uncompress((Bytef *)param->des, &pagesize,
+(const Bytef *)param->compbuf, param->len);
+param->state = DONE;
+}
+qemu_mutex_unlock(¶m->mutex);
 }
 return NULL;
 }
@@ -1379,6 +1404,9 @@ void migrate_decompress_threads_join(void)
 quit_thread = true;
 thread_count = migrate_decompress_threads();
 for (i = 0; i < thread_count; i++) {
+qemu_cond_signal(&decomp_param[i].cond);
+}
+for (i = 0; i < thread_count; i++) {
 qemu_thread_join(decompress_threads + i);
 qemu_mutex_destroy(&decomp_param[i].mutex);
 qemu_cond_destroy(&decomp_param[i].cond);
@@ -1392,7 +1420,23 @@ void migrate_decompress_threads_join(void)
 static void decompress_data_with_multi_threads(uint8_t *compbuf,
 void *host, int len)
 {
-/* To be done */
+int idx, thread_count;
+
+thread_count = migrate_decompress_threads();
+while (true) {
+for (idx = 0; idx < thread_count; idx++) {
+if (decomp_param[idx].state == DONE) {
+memcpy(decomp_param[idx].compbuf, compbuf, len);
+decomp_param[idx].des = host;
+decomp_param[idx].len = len;
+start_decompression(&decomp_param[idx]);
+break;
+}
+}
+if (idx < thread_count) {
+break;
+}
+}
 }
 
 static int ram_load(QEMUFile *f, void *opaque, int version_id)
-- 
1.8.3.1