Il 28/06/2013 21:59, mrhi...@linux.vnet.ibm.com ha scritto: > +/* > + * Perform a non-optimized memory unregistration after every transfer > + * for demonsration purposes, only if pin-all is not requested. > + * > + * Potential optimizations: > + * 1. Start a new thread to run this function continuously > + - for bit clearing > + - and for receipt of unregister messages > + * 2. Use an LRU. > + * 3. Use workload hints. > + */ > +#ifdef RDMA_UNREGISTRATION_EXAMPLE > +static int qemu_rdma_unregister_waiting(RDMAContext *rdma) > +{ > + while (rdma->unregistrations[rdma->unregister_current]) { > + int ret; > + uint64_t wr_id = rdma->unregistrations[rdma->unregister_current]; > + uint64_t chunk = > + (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; > + uint64_t index = > + (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; > + RDMALocalBlock *block = > + &(rdma->local_ram_blocks.block[index]); > + RDMARegister reg = { .current_index = index }; > + RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED, > + }; > + RDMAControlHeader head = { .len = sizeof(RDMARegister), > + .type = RDMA_CONTROL_UNREGISTER_REQUEST, > + .repeat = 1, > + }; > + > + DDPRINTF("Processing unregister for chunk: %" PRIu64 " at position > %d\n", > + chunk, rdma->unregister_current); > + > + rdma->unregistrations[rdma->unregister_current] = 0; > + rdma->unregister_current++; > + > + if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) { > + rdma->unregister_current = 0; > + } > + > + DDPRINTF("Sending unregister for chunk: %" PRIu64 "\n", chunk); > + > + clear_bit(chunk, block->unregister_bitmap);
The chunk is still registered at this point, shouldn't it be after the ibv_dereg_mr or something like that? > + if (test_bit(chunk, block->transit_bitmap)) { > + DDPRINTF("Cannot unregister inflight chunk: %" PRIu64 "\n", > chunk); > + continue; > + } This was not clear from your answer: who exactly will unregister this chunk? Why not call the 15 lines below this one also at this point: + if (wr_id == RDMA_WRID_RDMA_WRITE) { + uint64_t chunk = + (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT; + uint64_t index = + (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT; + RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); + + DDDPRINTF("completions %s (%" PRId64 ") left %d, " + "block %" PRIu64 ", chunk: %" PRIu64 "\n", + print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk); + + clear_bit(chunk, block->transit_bitmap); + + if (rdma->nb_sent > 0) { + rdma->nb_sent--; + } ? > + > + ret = ibv_dereg_mr(block->pmr[chunk]); > + block->pmr[chunk] = NULL; > + block->remote_keys[chunk] = 0; > + > + if (ret != 0) { > + perror("unregistration chunk failed"); > + return -ret; > + } > + rdma->total_registrations--; > + > + reg.key.chunk = chunk; > + register_to_network(®); > + ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®, > + &resp, NULL, NULL); > + if (ret < 0) { > + return ret; > + } > + > + DDPRINTF("Unregister for chunk: %" PRIu64 " complete.\n", chunk); > + } > + > + return 0; > +} > + > +/* > + * Set bit for unregistration in the next iteration. > + * We cannot transmit right here, but will unpin later. > + */ > +static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index, > + uint64_t chunk, uint64_t wr_id) > +{ > + if (rdma->unregistrations[rdma->unregister_next] != 0) { > + fprintf(stderr, "rdma migration: queue is full!\n"); > + } else { > + RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]); > + > + if (!test_and_set_bit(chunk, block->unregister_bitmap)) { > + DDPRINTF("Appending unregister chunk %" PRIu64 > + " at position %d\n", chunk, rdma->unregister_next); > + > + rdma->unregistrations[rdma->unregister_next++] = wr_id; > + > + if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) { > + rdma->unregister_next = 0; > + } > + } else { > + DDPRINTF("Unregister chunk %" PRIu64 " already in queue.\n", > + chunk); > + } > + } > +} > +#endif > static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader > *head, > uint8_t *data, RDMAControlHeader *resp, > int *resp_idx, > @@ -1006,6 +1132,17 @@ static uint64_t qemu_rdma_poll(RDMAContext *rdma, > uint64_t *wr_id_out) > if (rdma->nb_sent > 0) { > rdma->nb_sent--; > } > + if (!rdma->pin_all) { > + /* > + * FYI: If one wanted to signal a specific chunk to be > unregistered > + * using LRU or workload-specific information, this is the > function > + * you would call to do so. That chunk would then get > asynchronously > + * unregistered later. > + */ > +#ifdef RDMA_UNREGISTRATION_EXAMPLE > + qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id); > +#endif > + } > } else { > DDPRINTF("other completion %s (%" PRId64 ") received left %d\n", > print_wrid(wr_id), wr_id, rdma->nb_sent); > @@ -1423,6 +1560,12 @@ retry: > chunk_start = ram_chunk_start(block, chunk); > chunk_end = ram_chunk_end(block, chunk); > > + if (!rdma->pin_all) { > +#ifdef RDMA_UNREGISTRATION_EXAMPLE > + qemu_rdma_unregister_waiting(rdma); > +#endif > + } > + > while (test_bit(chunk, block->transit_bitmap)) { > (void)count; > DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64 >