On 27/08/2024 20:54, Maciej S. Szmigiero wrote:
External email: Use caution opening links or attachments


From: "Maciej S. Szmigiero" <maciej.szmigi...@oracle.com>

Add a basic support for receiving device state via multifd channels -
channels that are shared with RAM transfers.

To differentiate between a device state and a RAM packet the packet
header is read first.

Depending whether MULTIFD_FLAG_DEVICE_STATE flag is present or not in the
packet header either device state (MultiFDPacketDeviceState_t) or RAM
data (existing MultiFDPacket_t) is then read.

The received device state data is provided to
qemu_loadvm_load_state_buffer() function for processing in the
device's load_state_buffer handler.

Signed-off-by: Maciej S. Szmigiero <maciej.szmigi...@oracle.com>
---
  migration/multifd.c | 127 +++++++++++++++++++++++++++++++++++++-------
  migration/multifd.h |  31 ++++++++++-
  2 files changed, 138 insertions(+), 20 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index b06a9fab500e..d5a8e5a9c9b5 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -21,6 +21,7 @@
  #include "file.h"
  #include "migration.h"
  #include "migration-stats.h"
+#include "savevm.h"
  #include "socket.h"
  #include "tls.h"
  #include "qemu-file.h"
@@ -209,10 +210,10 @@ void multifd_send_fill_packet(MultiFDSendParams *p)

      memset(packet, 0, p->packet_len);

-    packet->magic = cpu_to_be32(MULTIFD_MAGIC);
-    packet->version = cpu_to_be32(MULTIFD_VERSION);
+    packet->hdr.magic = cpu_to_be32(MULTIFD_MAGIC);
+    packet->hdr.version = cpu_to_be32(MULTIFD_VERSION);

-    packet->flags = cpu_to_be32(p->flags);
+    packet->hdr.flags = cpu_to_be32(p->flags);
      packet->next_packet_size = cpu_to_be32(p->next_packet_size);

      packet_num = qatomic_fetch_inc(&multifd_send_state->packet_num);
@@ -228,31 +229,49 @@ void multifd_send_fill_packet(MultiFDSendParams *p)
                              p->flags, p->next_packet_size);
  }

-static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+static int multifd_recv_unfill_packet_header(MultiFDRecvParams *p,
+                                             MultiFDPacketHdr_t *hdr,
+                                             Error **errp)
  {
-    MultiFDPacket_t *packet = p->packet;
-    int ret = 0;
-
-    packet->magic = be32_to_cpu(packet->magic);
-    if (packet->magic != MULTIFD_MAGIC) {
+    hdr->magic = be32_to_cpu(hdr->magic);
+    if (hdr->magic != MULTIFD_MAGIC) {
          error_setg(errp, "multifd: received packet "
                     "magic %x and expected magic %x",
-                   packet->magic, MULTIFD_MAGIC);
+                   hdr->magic, MULTIFD_MAGIC);
          return -1;
      }

-    packet->version = be32_to_cpu(packet->version);
-    if (packet->version != MULTIFD_VERSION) {
+    hdr->version = be32_to_cpu(hdr->version);
+    if (hdr->version != MULTIFD_VERSION) {
          error_setg(errp, "multifd: received packet "
                     "version %u and expected version %u",
-                   packet->version, MULTIFD_VERSION);
+                   hdr->version, MULTIFD_VERSION);
          return -1;
      }

-    p->flags = be32_to_cpu(packet->flags);
+    p->flags = be32_to_cpu(hdr->flags);
+
+    return 0;
+}
+
+static int multifd_recv_unfill_packet_device_state(MultiFDRecvParams *p,
+                                                   Error **errp)
+{
+    MultiFDPacketDeviceState_t *packet = p->packet_dev_state;
+
+    packet->instance_id = be32_to_cpu(packet->instance_id);
+    p->next_packet_size = be32_to_cpu(packet->next_packet_size);
+
+    return 0;
+}
+
+static int multifd_recv_unfill_packet_ram(MultiFDRecvParams *p, Error **errp)
+{
+    MultiFDPacket_t *packet = p->packet;
+    int ret = 0;
+
      p->next_packet_size = be32_to_cpu(packet->next_packet_size);
      p->packet_num = be64_to_cpu(packet->packet_num);
-    p->packets_recved++;

      if (!(p->flags & MULTIFD_FLAG_SYNC)) {
          ret = multifd_ram_unfill_packet(p, errp);
@@ -264,6 +283,19 @@ static int multifd_recv_unfill_packet(MultiFDRecvParams 
*p, Error **errp)
      return ret;
  }

+static int multifd_recv_unfill_packet(MultiFDRecvParams *p, Error **errp)
+{
+    p->packets_recved++;
+
+    if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
+        return multifd_recv_unfill_packet_device_state(p, errp);
+    } else {
+        return multifd_recv_unfill_packet_ram(p, errp);
+    }
+
+    g_assert_not_reached();

We can drop the assert and the "else":
if (p->flags & MULTIFD_FLAG_DEVICE_STATE) {
    return multifd_recv_unfill_packet_device_state(p, errp);
}

return multifd_recv_unfill_packet_ram(p, errp);

+}
+
  static bool multifd_send_should_exit(void)
  {
      return qatomic_read(&multifd_send_state->exiting);
@@ -1014,6 +1046,7 @@ static void 
multifd_recv_cleanup_channel(MultiFDRecvParams *p)
      p->packet_len = 0;
      g_free(p->packet);
      p->packet = NULL;
+    g_clear_pointer(&p->packet_dev_state, g_free);
      g_free(p->normal);
      p->normal = NULL;
      g_free(p->zero);
@@ -1126,8 +1159,13 @@ static void *multifd_recv_thread(void *opaque)
      rcu_register_thread();

      while (true) {
+        MultiFDPacketHdr_t hdr;
          uint32_t flags = 0;
+        bool is_device_state = false;
          bool has_data = false;
+        uint8_t *pkt_buf;
+        size_t pkt_len;
+
          p->normal_num = 0;

          if (use_packets) {
@@ -1135,8 +1173,28 @@ static void *multifd_recv_thread(void *opaque)
                  break;
              }

-            ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
-                                           p->packet_len, &local_err);
+            ret = qio_channel_read_all_eof(p->c, (void *)&hdr,
+                                           sizeof(hdr), &local_err);
+            if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
+                break;
+            }
+
+            ret = multifd_recv_unfill_packet_header(p, &hdr, &local_err);
+            if (ret) {
+                break;
+            }
+
+            is_device_state = p->flags & MULTIFD_FLAG_DEVICE_STATE;
+            if (is_device_state) {
+                pkt_buf = (uint8_t *)p->packet_dev_state + sizeof(hdr);
+                pkt_len = sizeof(*p->packet_dev_state) - sizeof(hdr);
+            } else {
+                pkt_buf = (uint8_t *)p->packet + sizeof(hdr);
+                pkt_len = p->packet_len - sizeof(hdr);
+            }
+
+            ret = qio_channel_read_all_eof(p->c, (char *)pkt_buf, pkt_len,
+                                           &local_err);
              if (ret == 0 || ret == -1) {   /* 0: EOF  -1: Error */
                  break;
              }
@@ -1181,8 +1239,33 @@ static void *multifd_recv_thread(void *opaque)
              has_data = !!p->data->size;
          }

-        if (has_data) {
-            ret = multifd_recv_state->ops->recv(p, &local_err);
+        if (!is_device_state) {
+            if (has_data) {
+                ret = multifd_recv_state->ops->recv(p, &local_err);
+                if (ret != 0) {
+                    break;
+                }
+            }
+        } else {
+            g_autofree char *idstr = NULL;
+            g_autofree char *dev_state_buf = NULL;
+
+            assert(use_packets);
+
+            if (p->next_packet_size > 0) {
+                dev_state_buf = g_malloc(p->next_packet_size);
+
+                ret = qio_channel_read_all(p->c, dev_state_buf, 
p->next_packet_size, &local_err);
+                if (ret != 0) {
+                    break;
+                }
+            }
+
+            idstr = g_strndup(p->packet_dev_state->idstr, 
sizeof(p->packet_dev_state->idstr));
+            ret = qemu_loadvm_load_state_buffer(idstr,
+                                                
p->packet_dev_state->instance_id,
+                                                dev_state_buf, 
p->next_packet_size,
+                                                &local_err);
              if (ret != 0) {
                  break;
              }
@@ -1190,6 +1273,11 @@ static void *multifd_recv_thread(void *opaque)

          if (use_packets) {
              if (flags & MULTIFD_FLAG_SYNC) {
+                if (is_device_state) {
+                    error_setg(&local_err, "multifd: received SYNC device state 
packet");
+                    break;
+                }
+
                  qemu_sem_post(&multifd_recv_state->sem_sync);
                  qemu_sem_wait(&p->sem_sync);
              }
@@ -1258,6 +1346,7 @@ int multifd_recv_setup(Error **errp)
              p->packet_len = sizeof(MultiFDPacket_t)
                  + sizeof(uint64_t) * page_count;
              p->packet = g_malloc0(p->packet_len);
+            p->packet_dev_state = g_malloc0(sizeof(*p->packet_dev_state));
          }
          p->name = g_strdup_printf("mig/dst/recv_%d", i);
          p->normal = g_new0(ram_addr_t, page_count);
diff --git a/migration/multifd.h b/migration/multifd.h
index a3e35196d179..a8f3e4838c01 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -45,6 +45,12 @@ MultiFDRecvData *multifd_get_recv_data(void);
  #define MULTIFD_FLAG_QPL (4 << 1)
  #define MULTIFD_FLAG_UADK (8 << 1)

+/*
+ * If set it means that this packet contains device state
+ * (MultiFDPacketDeviceState_t), not RAM data (MultiFDPacket_t).
+ */
+#define MULTIFD_FLAG_DEVICE_STATE (1 << 4)
+
  /* This value needs to be a multiple of qemu_target_page_size() */
  #define MULTIFD_PACKET_SIZE (512 * 1024)

@@ -52,6 +58,11 @@ typedef struct {
      uint32_t magic;
      uint32_t version;
      uint32_t flags;
+} __attribute__((packed)) MultiFDPacketHdr_t;

Maybe split this patch into two: one that adds the packet header concept and another that adds the new device packet?

+
+typedef struct {
+    MultiFDPacketHdr_t hdr;
+
      /* maximum number of allocated pages */
      uint32_t pages_alloc;
      /* non zero pages */
@@ -72,6 +83,16 @@ typedef struct {
      uint64_t offset[];
  } __attribute__((packed)) MultiFDPacket_t;

+typedef struct {
+    MultiFDPacketHdr_t hdr;
+
+    char idstr[256] QEMU_NONSTRING;

idstr should be null terminated, or am I missing something?

Thanks.

+    uint32_t instance_id;
+
+    /* size of the next packet that contains the actual data */
+    uint32_t next_packet_size;
+} __attribute__((packed)) MultiFDPacketDeviceState_t;
+
  typedef struct {
      /* number of used pages */
      uint32_t num;
@@ -89,6 +110,13 @@ struct MultiFDRecvData {
      off_t file_offset;
  };

+typedef struct {
+    char *idstr;
+    uint32_t instance_id;
+    char *buf;
+    size_t buf_len;
+} MultiFDDeviceState_t;
+
  typedef enum {
      MULTIFD_PAYLOAD_NONE,
      MULTIFD_PAYLOAD_RAM,
@@ -204,8 +232,9 @@ typedef struct {

      /* thread local variables. No locking required */

-    /* pointer to the packet */
+    /* pointers to the possible packet types */
      MultiFDPacket_t *packet;
+    MultiFDPacketDeviceState_t *packet_dev_state;
      /* size of the next packet that contains pages */
      uint32_t next_packet_size;
      /* packets received through this channel */

Reply via email to