From: "Dr. David Alan Gilbert" <dgilb...@redhat.com>

Use the order of incoming RAMBlocks from the source to record
an index number; that then allows us to sort the destination
local RAMBlock list to match the source.

Now that the RAMBlocks are known to be in the same order, this
simplifies the RDMA Registration step which previously tried to
match RAMBlocks based on offset (which isn't guaranteed to match).

Looking at the existing compress code, I think it was erroneously
relying on an assumption of matching ordering, which this fixes.

Signed-off-by: Dr. David Alan Gilbert <dgilb...@redhat.com>
Signed-off-by: Juan Quintela <quint...@redhat.com>
---
 migration/rdma.c | 101 ++++++++++++++++++++++++++++++++++++++++---------------
 trace-events     |   2 ++
 2 files changed, 75 insertions(+), 28 deletions(-)

diff --git a/migration/rdma.c b/migration/rdma.c
index a652e67..73844a3 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -225,6 +225,7 @@ typedef struct RDMALocalBlock {
     uint32_t      *remote_keys;     /* rkeys for chunk-level registration */
     uint32_t       remote_rkey;     /* rkeys for non-chunk-level registration 
*/
     int            index;           /* which block are we */
+    unsigned int   src_index;       /* (Only used on dest) */
     bool           is_ram_block;
     int            nb_chunks;
     unsigned long *transit_bitmap;
@@ -354,6 +355,9 @@ typedef struct RDMAContext {
     RDMALocalBlocks local_ram_blocks;
     RDMADestBlock  *dest_blocks;

+    /* Index of the next RAMBlock received during block registration */
+    unsigned int    next_src_index;
+
     /*
      * Migration on *destination* started.
      * Then use coroutine yield function.
@@ -562,6 +566,7 @@ static int rdma_add_block(RDMAContext *rdma, const char 
*block_name,
     block->offset = block_offset;
     block->length = length;
     block->index = local->nb_blocks;
+    block->src_index = ~0U; /* Filled in by the receipt of the block list */
     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
     block->transit_bitmap = bitmap_new(block->nb_chunks);
     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
@@ -2917,6 +2922,14 @@ err_rdma_dest_wait:
     return ret;
 }

+static int dest_ram_sort_func(const void *a, const void *b)
+{
+    unsigned int a_index = ((const RDMALocalBlock *)a)->src_index;
+    unsigned int b_index = ((const RDMALocalBlock *)b)->src_index;
+
+    return (a_index < b_index) ? -1 : (a_index != b_index);
+}
+
 /*
  * During each iteration of the migration, we listen for instructions
  * by the source VM to perform dynamic page registrations before they
@@ -2994,6 +3007,13 @@ static int qemu_rdma_registration_handle(QEMUFile *f, 
void *opaque)
         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
             trace_qemu_rdma_registration_handle_ram_blocks();

+            /* Sort our local RAM Block list so it's the same as the source,
+             * we can do this since we've filled in a src_index in the list
+             * as we received the RAMBlock list earlier.
+             */
+            qsort(rdma->local_ram_blocks.block,
+                  rdma->local_ram_blocks.nb_blocks,
+                  sizeof(RDMALocalBlock), dest_ram_sort_func);
             if (rdma->pin_all) {
                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
                 if (ret) {
@@ -3021,6 +3041,12 @@ static int qemu_rdma_registration_handle(QEMUFile *f, 
void *opaque)
                 rdma->dest_blocks[i].length = local->block[i].length;

                 dest_block_to_network(&rdma->dest_blocks[i]);
+                trace_qemu_rdma_registration_handle_ram_blocks_loop(
+                    local->block[i].block_name,
+                    local->block[i].offset,
+                    local->block[i].length,
+                    local->block[i].local_host_addr,
+                    local->block[i].src_index);
             }

             blocks.len = rdma->local_ram_blocks.nb_blocks
@@ -3144,13 +3170,44 @@ out:
     return ret;
 }

+/* Destination:
+ * Called via a ram_control_load_hook during the initial RAM load section which
+ * lists the RAMBlocks by name.  This lets us know the order of the RAMBlocks
+ * on the source.
+ * We've already built our local RAMBlock list, but not yet sent the list to
+ * the source.
+ */
+static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char 
*name)
+{
+    RDMAContext *rdma = rfile->rdma;
+    int curr;
+    int found = -1;
+
+    /* Find the matching RAMBlock in our local list */
+    for (curr = 0; curr < rdma->local_ram_blocks.nb_blocks; curr++) {
+        if (!strcmp(rdma->local_ram_blocks.block[curr].block_name, name)) {
+            found = curr;
+            break;
+        }
+    }
+
+    if (found == -1) {
+        error_report("RAMBlock '%s' not found on destination", name);
+        return -ENOENT;
+    }
+
+    rdma->local_ram_blocks.block[curr].src_index = rdma->next_src_index;
+    trace_rdma_block_notification_handle(name, rdma->next_src_index);
+    rdma->next_src_index++;
+
+    return 0;
+}
+
 static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void 
*data)
 {
     switch (flags) {
     case RAM_CONTROL_BLOCK_REG:
-        /* TODO A later patch */
-        return 0;
-        break;
+        return rdma_block_notification_handle(opaque, data);

     case RAM_CONTROL_HOOK:
         return qemu_rdma_registration_handle(f, opaque);
@@ -3201,7 +3258,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void 
*opaque,
     if (flags == RAM_CONTROL_SETUP) {
         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
         RDMALocalBlocks *local = &rdma->local_ram_blocks;
-        int reg_result_idx, i, j, nb_dest_blocks;
+        int reg_result_idx, i, nb_dest_blocks;

         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
         trace_qemu_rdma_registration_stop_ram();
@@ -3237,9 +3294,10 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void 
*opaque,
          */

         if (local->nb_blocks != nb_dest_blocks) {
-            ERROR(errp, "ram blocks mismatch #1! "
+            ERROR(errp, "ram blocks mismatch (Number of blocks %d vs %d) "
                         "Your QEMU command line parameters are probably "
-                        "not identical on both the source and destination.");
+                        "not identical on both the source and destination.",
+                        local->nb_blocks, nb_dest_blocks);
             return -EINVAL;
         }

@@ -3249,30 +3307,17 @@ static int qemu_rdma_registration_stop(QEMUFile *f, 
void *opaque,
         for (i = 0; i < nb_dest_blocks; i++) {
             network_to_dest_block(&rdma->dest_blocks[i]);

-            /* search local ram blocks */
-            for (j = 0; j < local->nb_blocks; j++) {
-                if (rdma->dest_blocks[i].offset != local->block[j].offset) {
-                    continue;
-                }
-
-                if (rdma->dest_blocks[i].length != local->block[j].length) {
-                    ERROR(errp, "ram blocks mismatch #2! "
-                        "Your QEMU command line parameters are probably "
-                        "not identical on both the source and destination.");
-                    return -EINVAL;
-                }
-                local->block[j].remote_host_addr =
-                        rdma->dest_blocks[i].remote_host_addr;
-                local->block[j].remote_rkey = rdma->dest_blocks[i].remote_rkey;
-                break;
-            }
-
-            if (j >= local->nb_blocks) {
-                ERROR(errp, "ram blocks mismatch #3! "
-                        "Your QEMU command line parameters are probably "
-                        "not identical on both the source and destination.");
+            /* We require that the blocks are in the same order */
+            if (rdma->dest_blocks[i].length != local->block[i].length) {
+                ERROR(errp, "Block %s/%d has a different length %" PRIu64
+                            "vs %" PRIu64, local->block[i].block_name, i,
+                            local->block[i].length,
+                            rdma->dest_blocks[i].length);
                 return -EINVAL;
             }
+            local->block[i].remote_host_addr =
+                    rdma->dest_blocks[i].remote_host_addr;
+            local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
         }
     }

diff --git a/trace-events b/trace-events
index b1e572a..a38dd2e 100644
--- a/trace-events
+++ b/trace-events
@@ -1433,6 +1433,7 @@ qemu_rdma_register_and_get_keys(uint64_t len, void 
*start) "Registering %" PRIu6
 qemu_rdma_registration_handle_compress(int64_t length, int index, int64_t 
offset) "Zapping zero chunk: %" PRId64 " bytes, index %d, offset %" PRId64
 qemu_rdma_registration_handle_finished(void) ""
 qemu_rdma_registration_handle_ram_blocks(void) ""
+qemu_rdma_registration_handle_ram_blocks_loop(const char *name, uint64_t 
offset, uint64_t length, void *local_host_addr, unsigned int src_index) "%s: 
@%" PRIx64 "/%" PRIu64 " host:@%p src_index: %u"
 qemu_rdma_registration_handle_register(int requests) "%d requests"
 qemu_rdma_registration_handle_register_loop(int req, int index, uint64_t addr, 
uint64_t chunks) "Registration request (%d): index %d, current_addr %" PRIu64 " 
chunks: %" PRIu64
 qemu_rdma_registration_handle_register_rkey(int rkey) "%x"
@@ -1459,6 +1460,7 @@ qemu_rdma_write_one_sendreg(uint64_t chunk, int len, int 
index, int64_t offset)
 qemu_rdma_write_one_top(uint64_t chunks, uint64_t size) "Writing %" PRIu64 " 
chunks, (%" PRIu64 " MB)"
 qemu_rdma_write_one_zero(uint64_t chunk, int len, int index, int64_t offset) 
"Entire chunk is zero, sending compress: %" PRIu64 " for %d bytes, index: %d, 
offset: %" PRId64
 rdma_add_block(const char *block_name, int block, uint64_t addr, uint64_t 
offset, uint64_t len, uint64_t end, uint64_t bits, int chunks) "Added Block: 
'%s':%d, addr: %" PRIu64 ", offset: %" PRIu64 " length: %" PRIu64 " end: %" 
PRIu64 " bits %" PRIu64 " chunks %d"
+rdma_block_notification_handle(const char *name, int index) "%s at %d"
 rdma_delete_block(void *block, uint64_t addr, uint64_t offset, uint64_t len, 
uint64_t end, uint64_t bits, int chunks) "Deleted Block: %p, addr: %" PRIu64 ", 
offset: %" PRIu64 " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " 
chunks %d"
 rdma_start_incoming_migration(void) ""
 rdma_start_incoming_migration_after_dest_init(void) ""
-- 
2.4.3


Reply via email to