On 26.11.2024 20:58, Fabiano Rosas wrote:
"Maciej S. Szmigiero" <[email protected]> writes:

From: "Maciej S. Szmigiero" <[email protected]>

A new function multifd_queue_device_state() is provided for device to queue
its state for transmission via a multifd channel.

Signed-off-by: Maciej S. Szmigiero <[email protected]>
---
  include/migration/misc.h         |   4 ++
  migration/meson.build            |   1 +
  migration/multifd-device-state.c | 106 +++++++++++++++++++++++++++++++
  migration/multifd-nocomp.c       |  11 +++-
  migration/multifd.c              |  43 +++++++++++--
  migration/multifd.h              |  24 ++++---
  6 files changed, 173 insertions(+), 16 deletions(-)
  create mode 100644 migration/multifd-device-state.c

diff --git a/include/migration/misc.h b/include/migration/misc.h
index c92ca018ab3b..118e205bbcc6 100644
--- a/include/migration/misc.h
+++ b/include/migration/misc.h
@@ -109,4 +109,8 @@ bool migration_incoming_postcopy_advised(void);
  /* True if background snapshot is active */
  bool migration_in_bg_snapshot(void);
+/* migration/multifd-device-state.c */
+bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
+                                char *data, size_t len);
+
  #endif
diff --git a/migration/meson.build b/migration/meson.build
index d53cf3417ab8..9788c47bb56e 100644
--- a/migration/meson.build
+++ b/migration/meson.build
@@ -22,6 +22,7 @@ system_ss.add(files(
    'migration-hmp-cmds.c',
    'migration.c',
    'multifd.c',
+  'multifd-device-state.c',
    'multifd-nocomp.c',
    'multifd-zlib.c',
    'multifd-zero-page.c',
diff --git a/migration/multifd-device-state.c b/migration/multifd-device-state.c
new file mode 100644
index 000000000000..7741a64fbd4d
--- /dev/null
+++ b/migration/multifd-device-state.c
@@ -0,0 +1,106 @@
+/*
+ * Multifd device state migration
+ *
+ * Copyright (C) 2024 Oracle and/or its affiliates.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
+ * See the COPYING file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+#include "qemu/lockable.h"
+#include "migration/misc.h"
+#include "multifd.h"
+
+static QemuMutex queue_job_mutex;
+
+static MultiFDSendData *device_state_send;
+
+size_t multifd_device_state_payload_size(void)
+{
+    return sizeof(MultiFDDeviceState_t);
+}
+
+void multifd_device_state_send_setup(void)
+{
+    qemu_mutex_init(&queue_job_mutex);
+
+    device_state_send = multifd_send_data_alloc();
+}
+
+void multifd_device_state_clear(MultiFDDeviceState_t *device_state)
+{
+    g_clear_pointer(&device_state->idstr, g_free);
+    g_clear_pointer(&device_state->buf, g_free);
+}
+
+void multifd_device_state_send_cleanup(void)
+{
+    g_clear_pointer(&device_state_send, multifd_send_data_free);
+
+    qemu_mutex_destroy(&queue_job_mutex);
+}
+
+static void multifd_device_state_fill_packet(MultiFDSendParams *p)
+{
+    MultiFDDeviceState_t *device_state = &p->data->u.device_state;
+    MultiFDPacketDeviceState_t *packet = p->packet_device_state;
+
+    packet->hdr.flags = cpu_to_be32(p->flags);
+    strncpy(packet->idstr, device_state->idstr, sizeof(packet->idstr));
+    packet->instance_id = cpu_to_be32(device_state->instance_id);
+    packet->next_packet_size = cpu_to_be32(p->next_packet_size);
+}
+
+static void multifd_prepare_header_device_state(MultiFDSendParams *p)
+{
+    p->iov[0].iov_len = sizeof(*p->packet_device_state);
+    p->iov[0].iov_base = p->packet_device_state;
+    p->iovs_num++;
+}
+
+void multifd_device_state_send_prepare(MultiFDSendParams *p)
+{
+    MultiFDDeviceState_t *device_state = &p->data->u.device_state;
+
+    assert(multifd_payload_device_state(p->data));
+
+    multifd_prepare_header_device_state(p);
+
+    assert(!(p->flags & MULTIFD_FLAG_SYNC));
+
+    p->next_packet_size = device_state->buf_len;
+    if (p->next_packet_size > 0) {
+        p->iov[p->iovs_num].iov_base = device_state->buf;
+        p->iov[p->iovs_num].iov_len = p->next_packet_size;
+        p->iovs_num++;
+    }
+
+    p->flags |= MULTIFD_FLAG_NOCOMP | MULTIFD_FLAG_DEVICE_STATE;
+
+    multifd_device_state_fill_packet(p);
+}
+
+bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
+                                char *data, size_t len)
+{
+    /* Device state submissions can come from multiple threads */
+    QEMU_LOCK_GUARD(&queue_job_mutex);
+    MultiFDDeviceState_t *device_state;
+
+    assert(multifd_payload_empty(device_state_send));
+
+    multifd_set_payload_type(device_state_send, MULTIFD_PAYLOAD_DEVICE_STATE);
+    device_state = &device_state_send->u.device_state;
+    device_state->idstr = g_strdup(idstr);
+    device_state->instance_id = instance_id;
+    device_state->buf = g_memdup2(data, len);
+    device_state->buf_len = len;
+
+    if (!multifd_send(&device_state_send)) {
+        multifd_send_data_clear(device_state_send);
+        return false;
+    }
+
+    return true;
+}
diff --git a/migration/multifd-nocomp.c b/migration/multifd-nocomp.c
index fa0fd0289eca..23564ce9aea9 100644
--- a/migration/multifd-nocomp.c
+++ b/migration/multifd-nocomp.c
@@ -84,6 +84,13 @@ static void multifd_nocomp_send_cleanup(MultiFDSendParams 
*p, Error **errp)
      return;
  }
+static void multifd_ram_prepare_header(MultiFDSendParams *p)
+{
+    p->iov[0].iov_len = p->packet_len;
+    p->iov[0].iov_base = p->packet;
+    p->iovs_num++;
+}
+
  static void multifd_send_prepare_iovs(MultiFDSendParams *p)
  {
      MultiFDPages_t *pages = &p->data->u.ram;
@@ -117,7 +124,7 @@ static int multifd_nocomp_send_prepare(MultiFDSendParams 
*p, Error **errp)
           * Only !zerocopy needs the header in IOV; zerocopy will
           * send it separately.
           */
-        multifd_send_prepare_header(p);
+        multifd_ram_prepare_header(p);
      }
multifd_send_prepare_iovs(p);
@@ -368,7 +375,7 @@ bool multifd_send_prepare_common(MultiFDSendParams *p)
          return false;
      }
- multifd_send_prepare_header(p);
+    multifd_ram_prepare_header(p);
return true;
  }
diff --git a/migration/multifd.c b/migration/multifd.c
index 730acf55cfad..56419af417cc 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -12,6 +12,7 @@
#include "qemu/osdep.h"
  #include "qemu/cutils.h"
+#include "qemu/iov.h"
  #include "qemu/rcu.h"
  #include "exec/target_page.h"
  #include "sysemu/sysemu.h"
@@ -19,6 +20,7 @@
  #include "qemu/error-report.h"
  #include "qapi/error.h"
  #include "file.h"
+#include "migration/misc.h"
  #include "migration.h"
  #include "migration-stats.h"
  #include "savevm.h"
@@ -111,7 +113,9 @@ MultiFDSendData *multifd_send_data_alloc(void)
       * added to the union in the future are larger than
       * (MultiFDPages_t + flex array).
       */
-    max_payload_size = MAX(multifd_ram_payload_size(), sizeof(MultiFDPayload));
+    max_payload_size = MAX(multifd_ram_payload_size(),
+                           multifd_device_state_payload_size());
+    max_payload_size = MAX(max_payload_size, sizeof(MultiFDPayload));
/*
       * Account for any holes the compiler might insert. We can't pack
@@ -130,6 +134,9 @@ void multifd_send_data_clear(MultiFDSendData *data)
      }
switch (data->type) {
+    case MULTIFD_PAYLOAD_DEVICE_STATE:
+        multifd_device_state_clear(&data->u.device_state);
+        break;
      default:
          /* Nothing to do */
          break;
@@ -232,6 +239,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error 
**errp)
      return msg.id;
  }
+/* Fills a RAM multifd packet */
  void multifd_send_fill_packet(MultiFDSendParams *p)
  {
      MultiFDPacket_t *packet = p->packet;
@@ -524,6 +532,7 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams 
*p, Error **errp)
      p->name = NULL;
      g_clear_pointer(&p->data, multifd_send_data_free);
      p->packet_len = 0;
+    g_clear_pointer(&p->packet_device_state, g_free);
      g_free(p->packet);
      p->packet = NULL;
      multifd_send_state->ops->send_cleanup(p, errp);
@@ -536,6 +545,7 @@ static void multifd_send_cleanup_state(void)
  {
      file_cleanup_outgoing_migration();
      socket_cleanup_outgoing_migration();
+    multifd_device_state_send_cleanup();
      qemu_sem_destroy(&multifd_send_state->channels_created);
      qemu_sem_destroy(&multifd_send_state->channels_ready);
      qemu_mutex_destroy(&multifd_send_state->multifd_send_mutex);
@@ -662,16 +672,33 @@ static void *multifd_send_thread(void *opaque)
           * qatomic_store_release() in multifd_send().
           */
          if (qatomic_load_acquire(&p->pending_job)) {
+            bool is_device_state = multifd_payload_device_state(p->data);
+            size_t total_size;
+
              p->flags = 0;
              p->iovs_num = 0;
              assert(!multifd_payload_empty(p->data));
- ret = multifd_send_state->ops->send_prepare(p, &local_err);
-            if (ret != 0) {
-                break;
+            if (is_device_state) {
+                multifd_device_state_send_prepare(p);
+
+                total_size = iov_size(p->iov, p->iovs_num);

This is such a good idea, because it allows us to kill
next_packet_size. Let's make it work.

What if you add packet_len to mig_stats under use_zero_copy at
multifd_nocomp_send_prepare? It's only fair since that's when the data
is actually sent. Then this total_size gets consolidated between the
paths.


Adding the header to multifd_bytes where it is actually sent
(in multifd_nocomp_send_prepare() in this case) makes sense to me -
will change it so.

Thanks,
Maciej


Reply via email to