On Thu, Apr 25, 2024 at 02:21:13AM +0000, Hao Xiang wrote:
> Multifd sender path gets an array of pages queued by the migration
> thread. It performs zero page checking on every page in the array.
> The pages are classfied as either a zero page or a normal page. This
> change uses Intel DSA to offload the zero page checking from CPU to
> the DSA accelerator. The sender thread submits a batch of pages to DSA
> hardware and waits for the DSA completion thread to signal for work
> completion.
> 
> Signed-off-by: Hao Xiang <hao.xi...@linux.dev>
> ---
>  migration/multifd-zero-page.c | 99 +++++++++++++++++++++++++++++++++--
>  migration/multifd.c           | 27 +++++++++-
>  migration/multifd.h           |  1 +
>  3 files changed, 120 insertions(+), 7 deletions(-)
> 
> diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
> index e1b8370f88..4f426289e4 100644
> --- a/migration/multifd-zero-page.c
> +++ b/migration/multifd-zero-page.c
> @@ -37,25 +37,83 @@ static void swap_page_offset(ram_addr_t *pages_offset, 
> int a, int b)
>  }
>  
>  /**
> - * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> + * zero_page_detect_cpu: Perform zero page detection using CPU.
>   *
>   * Sorts normal pages before zero pages in p->pages->offset and updates
>   * p->pages->normal_num.
>   *
>   * @param p A pointer to the send params.
>   */
> -void multifd_send_zero_page_detect(MultiFDSendParams *p)
> +static void zero_page_detect_cpu(MultiFDSendParams *p)
>  {
>      MultiFDPages_t *pages = p->pages;
>      RAMBlock *rb = pages->block;
>      int i = 0;
>      int j = pages->num - 1;
>  
> -    if (!multifd_zero_page_enabled()) {
> -        pages->normal_num = pages->num;
> +    /*
> +     * Sort the page offset array by moving all normal pages to
> +     * the left and all zero pages to the right of the array.
> +     */
> +    while (i <= j) {
> +        uint64_t offset = pages->offset[i];
> +
> +        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> +            i++;
> +            continue;
> +        }
> +
> +        swap_page_offset(pages->offset, i, j);
> +        ram_release_page(rb->idstr, offset);
> +        j--;
> +    }
> +
> +    pages->normal_num = i;
> +}
> +
> +
> +#ifdef CONFIG_DSA_OPT
> +
> +static void swap_result(bool *results, int a, int b)
> +{
> +    bool temp;
> +
> +    if (a == b) {
>          return;
>      }
>  
> +    temp = results[a];
> +    results[a] = results[b];
> +    results[b] = temp;
> +}
> +
> +/**
> + * zero_page_detect_dsa: Perform zero page detection using
> + * Intel Data Streaming Accelerator (DSA).
> + *
> + * Sorts normal pages before zero pages in p->pages->offset and updates
> + * p->pages->normal_num.
> + *
> + * @param p A pointer to the send params.
> + */
> +static void zero_page_detect_dsa(MultiFDSendParams *p)
> +{
> +    MultiFDPages_t *pages = p->pages;
> +    RAMBlock *rb = pages->block;
> +    bool *results = p->batch_task->results;
> +
> +    for (int i = 0; i < p->pages->num; i++) {
> +        p->batch_task->addr[i] = (ram_addr_t)(rb->host + 
> p->pages->offset[i]);
> +    }
> +
> +    buffer_is_zero_dsa_batch_async(p->batch_task,
> +                                   (const void **)p->batch_task->addr,
> +                                   p->pages->num,
> +                                   p->page_size);
> +
> +    int i = 0;
> +    int j = pages->num - 1;
> +
>      /*
>       * Sort the page offset array by moving all normal pages to
>       * the left and all zero pages to the right of the array.
> @@ -63,11 +121,12 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
>      while (i <= j) {
>          uint64_t offset = pages->offset[i];
>  
> -        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> +        if (!results[i]) {
>              i++;
>              continue;
>          }
>  
> +        swap_result(results, i, j);
>          swap_page_offset(pages->offset, i, j);
>          ram_release_page(rb->idstr, offset);
>          j--;
> @@ -76,6 +135,15 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
>      pages->normal_num = i;
>  }
>  
> +#else
> +
> +static void zero_page_detect_dsa(MultiFDSendParams *p)
> +{
> +    exit(1);
> +}
> +
> +#endif
> +
>  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>  {
>      for (int i = 0; i < p->zero_num; i++) {
> @@ -87,3 +155,24 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>          }
>      }
>  }
> +
> +/**
> + * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> + *
> + * @param p A pointer to the send params.
> + */
> +void multifd_send_zero_page_detect(MultiFDSendParams *p)
> +{
> +    MultiFDPages_t *pages = p->pages;
> +
> +    if (!multifd_zero_page_enabled()) {
> +        pages->normal_num = pages->num;
> +        return;
> +    }
> +
> +    if (dsa_is_running()) {
> +        zero_page_detect_dsa(p);
> +    } else {
> +        zero_page_detect_cpu(p);
> +    }
> +}
> diff --git a/migration/multifd.c b/migration/multifd.c
> index cfd3a92f6c..7316643d0a 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -818,6 +818,8 @@ void multifd_send_shutdown(void)
>  
>      multifd_send_terminate_threads();
>  
> +    dsa_cleanup();
> +
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>          Error *local_err = NULL;
> @@ -1155,11 +1157,20 @@ bool multifd_send_setup(void)
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>      bool use_packets = multifd_use_packets();
>      uint8_t i;
> +    const char *dsa_parameter = migrate_multifd_dsa_accel();
>  
>      if (!migrate_multifd()) {
>          return true;
>      }
>  
> +    if (dsa_init(dsa_parameter)) {
> +        error_setg(&local_err, "multifd: Sender failed to initialize DSA.");
> +        error_report_err(local_err);
> +        return false;
> +    }
> +
> +    dsa_start();

Commented on this in another reply, let's condense them into a single dsa
call.  That call should also be nested into a flag to know at least dsa is
enabled:

  if (multifd_dsa_enabled()) {
     multifd_dsa_init();
  }

Multifd used to have some legacy code where it randomly call multifd_*
functions directly in generic migration code.  Not a good example to
follow there..

Also, I'd think it better if we don't introduce global vars for dsa, it
should be put under multifd_send_state (or recv_state).  So anything like
dsa_start() should always take that pointer first.

> +
>      thread_count = migrate_multifd_channels();
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> @@ -1393,6 +1404,7 @@ void multifd_recv_cleanup(void)
>              qemu_thread_join(&p->thread);
>          }
>      }
> +    dsa_cleanup();

Same here.

>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
>      }
> @@ -1568,6 +1580,9 @@ int multifd_recv_setup(Error **errp)
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>      bool use_packets = multifd_use_packets();
>      uint8_t i;
> +    const char *dsa_parameter = migrate_multifd_dsa_accel();
> +    int ret;
> +    Error *local_err = NULL;
>  
>      /*
>       * Return successfully if multiFD recv state is already initialised
> @@ -1577,6 +1592,15 @@ int multifd_recv_setup(Error **errp)
>          return 0;
>      }
>  
> +    ret = dsa_init(dsa_parameter);
> +    if (ret != 0) {
> +        error_setg(&local_err, "multifd: Receiver failed to initialize 
> DSA.");
> +        error_propagate(errp, local_err);
> +        return ret;
> +    }
> +
> +    dsa_start();

Same here.

in the dsa case, it'll be even better if you can find a way to unify
send/recv, as IIUC they do work similarly, setup() some dsa stuff, do some
zero page detection, cleanup() some dsa stuff.  They look all the same
irrelevant of src/dst.  I think it's nice if we can merge them.

> +
>      thread_count = migrate_multifd_channels();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> @@ -1616,13 +1640,12 @@ int multifd_recv_setup(Error **errp)
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -        int ret;
> -
>          ret = multifd_recv_state->ops->recv_setup(p, errp);
>          if (ret) {
>              return ret;
>          }
>      }
> +
>      return 0;
>  }
>  
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 16e27db5e9..b3717fae24 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -14,6 +14,7 @@
>  #define QEMU_MIGRATION_MULTIFD_H
>  
>  #include "ram.h"
> +#include "qemu/dsa.h"
>  
>  typedef struct MultiFDRecvData MultiFDRecvData;
>  
> -- 
> 2.30.2
> 
> 

-- 
Peter Xu


Reply via email to