RE: [PATCH RFC 03/14] migration/rdma: Create multiFd migration threads

2020-02-14 Thread fengzhimin
Thanks for your review. I will fix these errors in the next version(V3).

Due to migration data transfer using RDMA WRITE operation, we don't need to 
receive data in the destination.
We only need to poll the CQE in the destination, so multifd_recv_thread() can't 
be used directly.

-Original Message-
From: Juan Quintela [mailto:quint...@redhat.com] 
Sent: Thursday, February 13, 2020 6:13 PM
To: fengzhimin 
Cc: dgilb...@redhat.com; arm...@redhat.com; ebl...@redhat.com; 
qemu-devel@nongnu.org; Zhanghailiang ; 
jemmy858...@gmail.com
Subject: Re: [PATCH RFC 03/14] migration/rdma: Create multiFd migration threads

Zhimin Feng  wrote:
> Creation of the multifd send threads for RDMA migration, nothing 
> inside yet.
>
> Signed-off-by: Zhimin Feng 
> ---
>  migration/multifd.c   | 33 +---
>  migration/multifd.h   |  2 +
>  migration/qemu-file.c |  5 +++
>  migration/qemu-file.h |  1 +
>  migration/rdma.c  | 88 ++-
>  migration/rdma.h  |  3 ++
>  6 files changed, 125 insertions(+), 7 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c index 
> b3e8ae9bcc..63678d7fdd 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -424,7 +424,7 @@ void multifd_send_sync_main(QEMUFile *f)  {
>  int i;
>  
> -if (!migrate_use_multifd()) {
> +if (!migrate_use_multifd() || migrate_use_rdma()) {

You don't need sync with main channel on rdma?

> +static void rdma_send_channel_create(MultiFDSendParams *p) {
> +Error *local_err = NULL;
> +
> +if (p->quit) {
> +error_setg(_err, "multifd: send id %d already quit", p->id);
> +return ;
> +}
> +p->running = true;
> +
> +qemu_thread_create(>thread, p->name, multifd_rdma_send_thread, p,
> +   QEMU_THREAD_JOINABLE); }
> +
>  static void multifd_new_send_channel_async(QIOTask *task, gpointer 
> opaque)  {
>  MultiFDSendParams *p = opaque;
> @@ -621,7 +635,11 @@ int multifd_save_setup(Error **errp)
>  p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>  p->packet->version = cpu_to_be32(MULTIFD_VERSION);
>  p->name = g_strdup_printf("multifdsend_%d", i);
> -socket_send_channel_create(multifd_new_send_channel_async, p);
> +if (!migrate_use_rdma()) {
> +socket_send_channel_create(multifd_new_send_channel_async, p);
> +} else {
> +rdma_send_channel_create(p);
> +}

This is what we are trying to avoid.  Just create a struct ops, where we have a

ops->create_channel(new_channel_async, p)

or whatever, and fill it differently for rdma and for tcp.


>  }
>  return 0;
>  }
> @@ -720,7 +738,7 @@ void multifd_recv_sync_main(void)  {
>  int i;
>  
> -if (!migrate_use_multifd()) {
> +if (!migrate_use_multifd() || migrate_use_rdma()) {
>  return;
>  }

Ok. you can just put an empty function for you here.

>  for (i = 0; i < migrate_multifd_channels(); i++) { @@ -890,8 
> +908,13 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>  p->num_packets = 1;
>  
>  p->running = true;
> -qemu_thread_create(>thread, p->name, multifd_recv_thread, p,
> -   QEMU_THREAD_JOINABLE);
> +if (!migrate_use_rdma()) {
> +qemu_thread_create(>thread, p->name, multifd_recv_thread, p,
> +   QEMU_THREAD_JOINABLE);
> +} else {
> +qemu_thread_create(>thread, p->name, multifd_rdma_recv_thread, p,
> +   QEMU_THREAD_JOINABLE);
> +}

new_recv_chanel() member function.

>  atomic_inc(_recv_state->count);
>  return atomic_read(_recv_state->count) ==
> migrate_multifd_channels(); diff --git 
> a/migration/multifd.h b/migration/multifd.h index 
> d8b0205977..c9c11ad140 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
>  #ifndef QEMU_MIGRATION_MULTIFD_H
>  #define QEMU_MIGRATION_MULTIFD_H
>  
> +#include "migration/rdma.h"
> +
>  int multifd_save_setup(Error **errp);  void 
> multifd_save_cleanup(void);  int multifd_load_setup(Error **errp);

You are not exporting anything rdma related from here, are you?

> diff --git a/migration/qemu-file.c b/migration/qemu-file.c index 
> 1c3a358a14..f0ed8f1381 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -248,6 +248,11 @@ void qemu_fflush(QEMUFile *f)
>  f->iovcnt = 0;
>  }
>  
> +void *getQIOChannel(QEMUFile *f)
> +{
> +return f->opaque;
> +}
> +

We really want this to return a void?  and not a better type?
> +static v

Re: [PATCH RFC 03/14] migration/rdma: Create multiFd migration threads

2020-02-13 Thread Juan Quintela
Zhimin Feng  wrote:
> Creation of the multifd send threads for RDMA migration,
> nothing inside yet.
>
> Signed-off-by: Zhimin Feng 
> ---
>  migration/multifd.c   | 33 +---
>  migration/multifd.h   |  2 +
>  migration/qemu-file.c |  5 +++
>  migration/qemu-file.h |  1 +
>  migration/rdma.c  | 88 ++-
>  migration/rdma.h  |  3 ++
>  6 files changed, 125 insertions(+), 7 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index b3e8ae9bcc..63678d7fdd 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -424,7 +424,7 @@ void multifd_send_sync_main(QEMUFile *f)
>  {
>  int i;
>  
> -if (!migrate_use_multifd()) {
> +if (!migrate_use_multifd() || migrate_use_rdma()) {

You don't need sync with main channel on rdma?

> +static void rdma_send_channel_create(MultiFDSendParams *p)
> +{
> +Error *local_err = NULL;
> +
> +if (p->quit) {
> +error_setg(_err, "multifd: send id %d already quit", p->id);
> +return ;
> +}
> +p->running = true;
> +
> +qemu_thread_create(>thread, p->name, multifd_rdma_send_thread, p,
> +   QEMU_THREAD_JOINABLE);
> +}
> +
>  static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
>  {
>  MultiFDSendParams *p = opaque;
> @@ -621,7 +635,11 @@ int multifd_save_setup(Error **errp)
>  p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>  p->packet->version = cpu_to_be32(MULTIFD_VERSION);
>  p->name = g_strdup_printf("multifdsend_%d", i);
> -socket_send_channel_create(multifd_new_send_channel_async, p);
> +if (!migrate_use_rdma()) {
> +socket_send_channel_create(multifd_new_send_channel_async, p);
> +} else {
> +rdma_send_channel_create(p);
> +}

This is what we are trying to avoid.  Just create a struct ops, where we
have a

ops->create_channel(new_channel_async, p)

or whatever, and fill it differently for rdma and for tcp.


>  }
>  return 0;
>  }
> @@ -720,7 +738,7 @@ void multifd_recv_sync_main(void)
>  {
>  int i;
>  
> -if (!migrate_use_multifd()) {
> +if (!migrate_use_multifd() || migrate_use_rdma()) {
>  return;
>  }

Ok. you can just put an empty function for you here.

>  for (i = 0; i < migrate_multifd_channels(); i++) {
> @@ -890,8 +908,13 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error 
> **errp)
>  p->num_packets = 1;
>  
>  p->running = true;
> -qemu_thread_create(>thread, p->name, multifd_recv_thread, p,
> -   QEMU_THREAD_JOINABLE);
> +if (!migrate_use_rdma()) {
> +qemu_thread_create(>thread, p->name, multifd_recv_thread, p,
> +   QEMU_THREAD_JOINABLE);
> +} else {
> +qemu_thread_create(>thread, p->name, multifd_rdma_recv_thread, p,
> +   QEMU_THREAD_JOINABLE);
> +}

new_recv_chanel() member function.

>  atomic_inc(_recv_state->count);
>  return atomic_read(_recv_state->count) ==
> migrate_multifd_channels();
> diff --git a/migration/multifd.h b/migration/multifd.h
> index d8b0205977..c9c11ad140 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -13,6 +13,8 @@
>  #ifndef QEMU_MIGRATION_MULTIFD_H
>  #define QEMU_MIGRATION_MULTIFD_H
>  
> +#include "migration/rdma.h"
> +
>  int multifd_save_setup(Error **errp);
>  void multifd_save_cleanup(void);
>  int multifd_load_setup(Error **errp);

You are not exporting anything rdma related from here, are you?

> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 1c3a358a14..f0ed8f1381 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -248,6 +248,11 @@ void qemu_fflush(QEMUFile *f)
>  f->iovcnt = 0;
>  }
>  
> +void *getQIOChannel(QEMUFile *f)
> +{
> +return f->opaque;
> +}
> +

We really want this to return a void?  and not a better type?
> +static void migration_rdma_process_incoming(QEMUFile *f, Error **errp)
> +{
> +MigrationIncomingState *mis = migration_incoming_get_current();
> +Error *local_err = NULL;
> +QIOChannel *ioc = NULL;
> +bool start_migration;
> +
> +if (!mis->from_src_file) {
> +mis->from_src_file = f;
> +qemu_file_set_blocking(f, false);
> +
> +start_migration = migrate_use_multifd();
> +} else {
> +ioc = QIO_CHANNEL(getQIOChannel(f));
> +/* Multiple connections */
> +assert(migrate_use_multifd());

I am not sure that you can make this incompatible change.
You need to have *both*, old method and new multifd one.

I would have been happy to remove old precopy tcp method, but we
*assure* backwards compatibility.

> @@ -4003,8 +4032,12 @@ static void rdma_accept_incoming_migration(void 
> *opaque)
>  return;
>  }
>  
> -rdma->migration_started_on_destination = 1;
> -migration_fd_process_incoming(f, errp);
> +if (migrate_use_multifd()) {
> +   

[PATCH RFC 03/14] migration/rdma: Create multiFd migration threads

2020-02-13 Thread Zhimin Feng
Creation of the multifd send threads for RDMA migration,
nothing inside yet.

Signed-off-by: Zhimin Feng 
---
 migration/multifd.c   | 33 +---
 migration/multifd.h   |  2 +
 migration/qemu-file.c |  5 +++
 migration/qemu-file.h |  1 +
 migration/rdma.c  | 88 ++-
 migration/rdma.h  |  3 ++
 6 files changed, 125 insertions(+), 7 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index b3e8ae9bcc..63678d7fdd 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -424,7 +424,7 @@ void multifd_send_sync_main(QEMUFile *f)
 {
 int i;
 
-if (!migrate_use_multifd()) {
+if (!migrate_use_multifd() || migrate_use_rdma()) {
 return;
 }
 if (multifd_send_state->pages->used) {
@@ -562,6 +562,20 @@ out:
 return NULL;
 }
 
+static void rdma_send_channel_create(MultiFDSendParams *p)
+{
+Error *local_err = NULL;
+
+if (p->quit) {
+error_setg(_err, "multifd: send id %d already quit", p->id);
+return ;
+}
+p->running = true;
+
+qemu_thread_create(>thread, p->name, multifd_rdma_send_thread, p,
+   QEMU_THREAD_JOINABLE);
+}
+
 static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
 {
 MultiFDSendParams *p = opaque;
@@ -621,7 +635,11 @@ int multifd_save_setup(Error **errp)
 p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
 p->packet->version = cpu_to_be32(MULTIFD_VERSION);
 p->name = g_strdup_printf("multifdsend_%d", i);
-socket_send_channel_create(multifd_new_send_channel_async, p);
+if (!migrate_use_rdma()) {
+socket_send_channel_create(multifd_new_send_channel_async, p);
+} else {
+rdma_send_channel_create(p);
+}
 }
 return 0;
 }
@@ -720,7 +738,7 @@ void multifd_recv_sync_main(void)
 {
 int i;
 
-if (!migrate_use_multifd()) {
+if (!migrate_use_multifd() || migrate_use_rdma()) {
 return;
 }
 for (i = 0; i < migrate_multifd_channels(); i++) {
@@ -890,8 +908,13 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error 
**errp)
 p->num_packets = 1;
 
 p->running = true;
-qemu_thread_create(>thread, p->name, multifd_recv_thread, p,
-   QEMU_THREAD_JOINABLE);
+if (!migrate_use_rdma()) {
+qemu_thread_create(>thread, p->name, multifd_recv_thread, p,
+   QEMU_THREAD_JOINABLE);
+} else {
+qemu_thread_create(>thread, p->name, multifd_rdma_recv_thread, p,
+   QEMU_THREAD_JOINABLE);
+}
 atomic_inc(_recv_state->count);
 return atomic_read(_recv_state->count) ==
migrate_multifd_channels();
diff --git a/migration/multifd.h b/migration/multifd.h
index d8b0205977..c9c11ad140 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -13,6 +13,8 @@
 #ifndef QEMU_MIGRATION_MULTIFD_H
 #define QEMU_MIGRATION_MULTIFD_H
 
+#include "migration/rdma.h"
+
 int multifd_save_setup(Error **errp);
 void multifd_save_cleanup(void);
 int multifd_load_setup(Error **errp);
diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 1c3a358a14..f0ed8f1381 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -248,6 +248,11 @@ void qemu_fflush(QEMUFile *f)
 f->iovcnt = 0;
 }
 
+void *getQIOChannel(QEMUFile *f)
+{
+return f->opaque;
+}
+
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags)
 {
 int ret = 0;
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index a9b6d6ccb7..fc656a3b72 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -161,6 +161,7 @@ int qemu_file_shutdown(QEMUFile *f);
 QEMUFile *qemu_file_get_return_path(QEMUFile *f);
 void qemu_fflush(QEMUFile *f);
 void qemu_file_set_blocking(QEMUFile *f, bool block);
+void *getQIOChannel(QEMUFile *f);
 
 void ram_control_before_iterate(QEMUFile *f, uint64_t flags);
 void ram_control_after_iterate(QEMUFile *f, uint64_t flags);
diff --git a/migration/rdma.c b/migration/rdma.c
index 2379b8345b..f086ab5a82 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -34,6 +34,7 @@
 #include 
 #include 
 #include "trace.h"
+#include "multifd.h"
 
 /*
  * Print and error on both the Monitor and the Log file.
@@ -3975,6 +3976,34 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, 
const char *mode)
 return rioc->file;
 }
 
+static void migration_rdma_process_incoming(QEMUFile *f, Error **errp)
+{
+MigrationIncomingState *mis = migration_incoming_get_current();
+Error *local_err = NULL;
+QIOChannel *ioc = NULL;
+bool start_migration;
+
+if (!mis->from_src_file) {
+mis->from_src_file = f;
+qemu_file_set_blocking(f, false);
+
+start_migration = migrate_use_multifd();
+} else {
+ioc = QIO_CHANNEL(getQIOChannel(f));
+/* Multiple connections */
+assert(migrate_use_multifd());
+start_migration = multifd_recv_new_channel(ioc, _err);
+if