> -----Original Message----- > From: Fabiano Rosas <faro...@suse.de> > Sent: Tuesday, May 14, 2024 10:08 PM > To: Liu, Yuan1 <yuan1....@intel.com>; pet...@redhat.com > Cc: qemu-devel@nongnu.org; Zou, Nanhai <nanhai....@intel.com> > Subject: RE: [PATCH v6 6/7] migration/multifd: implement qpl compression > and decompression > > "Liu, Yuan1" <yuan1....@intel.com> writes: > > >> -----Original Message----- > >> From: Fabiano Rosas <faro...@suse.de> > >> Sent: Monday, May 13, 2024 11:14 PM > >> To: Liu, Yuan1 <yuan1....@intel.com>; pet...@redhat.com > >> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1....@intel.com>; Zou, > Nanhai > >> <nanhai....@intel.com> > >> Subject: Re: [PATCH v6 6/7] migration/multifd: implement qpl > compression > >> and decompression > >> > >> Yuan Liu <yuan1....@intel.com> writes: > >> > >> > each qpl job is used to (de)compress a normal page and it can > >> > be processed independently by the IAA hardware. All qpl jobs > >> > are submitted to the hardware at once, and wait for all jobs > >> > completion. If hardware path(IAA) is not available, use software > >> > for compression and decompression. > >> > > >> > Signed-off-by: Yuan Liu <yuan1....@intel.com> > >> > Reviewed-by: Nanhai Zou <nanhai....@intel.com> > >> > --- > >> > migration/multifd-qpl.c | 284 > +++++++++++++++++++++++++++++++++++++++- > >> > 1 file changed, 280 insertions(+), 4 deletions(-) > >> > > >> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c > >> > index 89fa51091a..9a1fddbdd0 100644 > >> > --- a/migration/multifd-qpl.c > >> > +++ b/migration/multifd-qpl.c > >> > @@ -13,6 +13,7 @@ > >> > #include "qemu/osdep.h" > >> > #include "qemu/module.h" > >> > #include "qapi/error.h" > >> > +#include "exec/ramblock.h" > >> > #include "migration.h" > >> > #include "multifd.h" > >> > #include "qpl/qpl.h" > >> > @@ -204,6 +205,139 @@ static void > >> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp) > >> > p->iov = NULL; > >> > } > >> > > >> > +/** > >> > + * multifd_qpl_prepare_job: prepare a compression or decompression > job > >> > + * > >> > + * Prepare a compression or decompression job and configure job > >> attributes > >> > + * including job compression level and flags. > >> > + * > >> > + * @job: pointer to the QplData structure > >> > >> qpl_job structure > > > > Thanks for the comment, I will fix this next version. > > > >> > + * @is_compression: compression or decompression indication > >> > + * @input: pointer to the input data buffer > >> > + * @input_len: the length of the input data > >> > + * @output: pointer to the output data buffer > >> > + * @output_len: the size of the output data buffer > >> > + */ > >> > +static void multifd_qpl_prepare_job(qpl_job *job, bool > is_compression, > >> > + uint8_t *input, uint32_t > input_len, > >> > + uint8_t *output, uint32_t > >> output_len) > >> > +{ > >> > + job->op = is_compression ? qpl_op_compress : qpl_op_decompress; > >> > + job->next_in_ptr = input; > >> > + job->next_out_ptr = output; > >> > + job->available_in = input_len; > >> > + job->available_out = output_len; > >> > + job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | > QPL_FLAG_OMIT_VERIFY; > >> > + /* only supports one compression level */ > >> > + job->level = 1; > >> > +} > >> > + > >> > +/** > >> > + * multifd_qpl_build_packet: build a qpl compressed data packet > >> > + * > >> > + * The qpl compressed data packet consists of two parts, one part > >> stores > >> > + * the compressed length of each page, and the other part is the > >> compressed > >> > + * data of each page. The zbuf_hdr stores the compressed length of > all > >> pages, > >> > + * and use a separate IOV to store the compressed data of each page. > >> > + * > >> > + * @qpl: pointer to the QplData structure > >> > + * @p: Params for the channel that we are using > >> > + * @idx: The index of the compressed length array > >> > + * @addr: pointer to the compressed data > >> > + * @len: The length of the compressed data > >> > + */ > >> > +static void multifd_qpl_build_packet(QplData *qpl, MultiFDSendParams > >> *p, > >> > + uint32_t idx, uint8_t *addr, > >> uint32_t len) > >> > +{ > >> > + qpl->zbuf_hdr[idx] = cpu_to_be32(len); > >> > + p->iov[p->iovs_num].iov_base = addr; > >> > + p->iov[p->iovs_num].iov_len = len; > >> > + p->iovs_num++; > >> > + p->next_packet_size += len; > >> > +} > >> > + > >> > +/** > >> > + * multifd_qpl_compress_pages: compress normal pages > >> > + * > >> > + * Each normal page will be compressed independently, and the > >> compression jobs > >> > + * will be submitted to the IAA hardware in non-blocking mode, > waiting > >> for all > >> > + * jobs to be completed and filling the compressed length and data > into > >> the > >> > + * sending IOVs. If IAA device is not available, the software path > is > >> used. > >> > + * > >> > + * Returns 0 for success or -1 for error > >> > + * > >> > + * @p: Params for the channel that we are using > >> > + * @errp: pointer to an error > >> > + */ > >> > +static int multifd_qpl_compress_pages(MultiFDSendParams *p, Error > >> **errp) > >> > +{ > >> > + qpl_status status; > >> > + QplData *qpl = p->compress_data; > >> > + MultiFDPages_t *pages = p->pages; > >> > + uint8_t *zbuf = qpl->zbuf; > >> > + uint8_t *host = pages->block->host; > >> > + uint32_t job_num = pages->normal_num; > >> > >> A bit misleading because job_num is used in the previous patch as a > >> synonym for page_count. We could change the previous patch to: > >> multifd_qpl_init(uint32_t page_count, ... > > > > Yes, I will replace job_num with page_count for multifd_qpl_init next > version. > > > >> > + qpl_job *job = NULL; > >> > + > >> > + assert(job_num <= qpl->total_job_num); > >> > + /* submit all compression jobs */ > >> > + for (int i = 0; i < job_num; i++) { > >> > + job = qpl->job_array[i]; > >> > + multifd_qpl_prepare_job(job, true, host + pages->offset[i], > >> > + p->page_size, zbuf, p->page_size - > 1); > >> > >> Isn't the output buffer size == page size, why the -1? > > > > I think the compressed data should be smaller than a normal page. > > If the compressed data size is equal to a normal page, send the original > > page directly to avoid decompression operation. > > > > If the output buffer size is set to p->page_size, the compressed data > size > > may be greater than or equal to a normal page, then there are two > conditions for > > sending a normal page, one is job status == QPL_STS_OK and job- > >total_out == p->page_size, > > another is job status == QPL_STS_MORE_OUTPUT_NEEDED. > > > > If the output buffer size is p->page_size - 1, only check > QPL_STS_MORE_OUTPUT_NEEDED is ok. > > I will add comments for this in the next version. > > > > Thanks. > > >> > + /* if hardware path(IAA) is unavailable, call the software > path > >> */ > >> > >> If we're doing the fallback automatically, isn't that what > qpl_path_auto > >> does already? What's the difference between the two approaches? > > > > The qpl_path_auto feature currently has some limitations. > > > > Limitation 1: it does not detect IAA device status before job > submission, which means > > I need to use qpl_init_job(qpl_path_hardware, ...) first to make sure > the hardware path > > is available, then qpl_path_auto can work for software fallback when > submitting a job to > > the IAA fails. > > > > Limitation 2: The job submission API has changed > > For the qpl_path_hardware, the qpl_submit_job is a non-blocking > function, only submitting > > a job to IAA work queues, use qpl_wait_job to get the job result. > > > > For qpl_path_software/auto, the qpl_submit_job is a blocking function > and returns the job > > result directly, qpl_wait_job can't get the job result. > > > > In short, the qpl_submit_job/qpl_wait_job APIs do not well support the > qpl_path_auto feature. > > Please put a summary of this in the commit message so in the future we > can evaluate whether to continue checking for ourselves.
Sure, I will add a summary in the commit message. I confirmed with the QPL development team that, sw fallback for qpl_submit_job/qpl_wait_job APIs will be ready next QPL release. So I will keep the current design next version, the software path only for the QPL unit test. After the next version of QPL is released, I will update the sw fallback code so that sw fallback can be used both in the QPL unit test and migration with IAA. > >> > + if (!qpl->iaa_avail) { > >> > >> This function got a bit convoluted, it's probably worth a check at the > >> start and a branch to different multifd_qpl_compress_pages_slow() > >> routine altogether. > > > > I agree that using multifd_qpl_compress_pages_slow is a better choice. > > > > In my previous thoughts, the QPl software path(or slow path) is only > used > > for the unit test and I did not consider fallback to the software path > when > > migration start because the QPL software path has no advantage over > zstd. > > So when the work queue is full, always try to resubmit the job instead > of > > fallback software path. > > > > I now realize that when IAA hardware throughput is the bottleneck(the > work queue is full), > > switching to software may also increase performance. > > > > I will implement the automatically falling back to the software when IAA > work > > queue is full into the next version and update the performance data. > With the > > increase in mutlfd migration threads, the performance will be better > than now. > > Currently, 4 threads will reach the IAA device throughput bottleneck, > then increasing > > The number of threads does not increase any performance. > > > >> > + status = qpl_execute_job(job); > >> > + if (status == QPL_STS_OK) { > >> > + multifd_qpl_build_packet(qpl, p, i, zbuf, job- > >> >total_out); > >> > + } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) { > >> > + /* compressed length exceeds page size, send page > >> directly */ > >> > + multifd_qpl_build_packet(qpl, p, i, host + pages- > >> >offset[i], > >> > + p->page_size); > >> > + } else { > >> > + error_setg(errp, "multifd %u: qpl_execute_job > >> error %d", > >> > + p->id, status); > >> > + return -1; > >> > + } > >> > + zbuf += p->page_size; > >> > + continue; > >> > + } > >> > +retry: > >> > + status = qpl_submit_job(job); > >> > + if (status == QPL_STS_OK) { > >> > + zbuf += p->page_size; > >> > + } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) { > >> > + goto retry; > >> > >> A retry count here would be nice. > > > > As the previous comment, I will switch to > multifd_qpl_compress_pages_slow > > When the work queue is full, I will give a performance comparison > between > > hardware only and software fallback next version. > > > >> > + } else { > >> > + error_setg(errp, "multifd %u: qpl_submit_job failed with > >> error %d", > >> > + p->id, status); > >> > + return -1; > >> > + } > >> > + } > >> > + if (!qpl->iaa_avail) { > >> > + goto done; > >> > + } > >> > + /* wait all jobs to complete for hardware(IAA) path */ > >> > + for (int i = 0; i < job_num; i++) { > >> > + job = qpl->job_array[i]; > >> > + status = qpl_wait_job(job); > >> > + if (status == QPL_STS_OK) { > >> > + multifd_qpl_build_packet(qpl, p, i, qpl->zbuf + (p- > >> >page_size * i), > >> > + job->total_out); > >> > + } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) { > >> > + /* compressed data length exceeds page size, send page > >> directly */ > >> > + multifd_qpl_build_packet(qpl, p, i, host + pages- > >> >offset[i], > >> > + p->page_size); > >> > + } else { > >> > + error_setg(errp, "multifd %u: qpl_wait_job failed with > >> error %d", > >> > + p->id, status); > >> > + return -1; > >> > + } > >> > + } > >> > +done: > >> > + return 0; > >> > +} > >> > + > >> > /** > >> > * multifd_qpl_send_prepare: prepare data to be able to send > >> > * > >> > @@ -217,8 +351,28 @@ static void > >> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp) > >> > */ > >> > static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error > **errp) > >> > { > >> > - /* Implement in next patch */ > >> > - return -1; > >> > + QplData *qpl = p->compress_data; > >> > + uint32_t hdr_size; > >> > + > >> > + if (!multifd_send_prepare_common(p)) { > >> > + goto out; > >> > + } > >> > + > >> > + assert(p->pages->normal_num <= qpl->total_job_num); > >> > + hdr_size = p->pages->normal_num * sizeof(uint32_t); > >> > + /* prepare the header that stores the lengths of all compressed > >> data */ > >> > + p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr; > >> > + p->iov[1].iov_len = hdr_size; > >> > >> Better use p->iovs_num here in case we ever decide to add more stuff to > >> the front of the array. > > > > Get it, I will fix this next version. > > > >> > + p->iovs_num++; > >> > + p->next_packet_size += hdr_size; > >> > >> Here's the first time we're setting this value, right? So just a > regular > >> attribution(=). > > > > Yes, I will fix this next version. > > > >> > + if (multifd_qpl_compress_pages(p, errp) != 0) { > >> > + return -1; > >> > + } > >> > + > >> > +out: > >> > + p->flags |= MULTIFD_FLAG_QPL; > >> > + multifd_send_fill_packet(p); > >> > + return 0; > >> > } > >> > > >> > /** > >> > @@ -256,6 +410,88 @@ static void > >> multifd_qpl_recv_cleanup(MultiFDRecvParams *p) > >> > p->compress_data = NULL; > >> > } > >> > > >> > +/** > >> > + * multifd_qpl_decompress_pages: decompress normal pages > >> > + * > >> > + * Each compressed page will be decompressed independently, and the > >> > + * decompression jobs will be submitted to the IAA hardware in non- > >> blocking > >> > + * mode, waiting for all jobs to be completed and loading the > >> decompressed > >> > + * data into guest memory. If IAA device is not available, the > software > >> path > >> > + * is used. > >> > + * > >> > + * Returns 0 for success or -1 for error > >> > + * > >> > + * @p: Params for the channel that we are using > >> > + * @errp: pointer to an error > >> > + */ > >> > +static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error > >> **errp) > >> > +{ > >> > + qpl_status status; > >> > + qpl_job *job; > >> > + QplData *qpl = p->compress_data; > >> > + uint32_t job_num = p->normal_num; > >> > + uint32_t off = 0; > >> > + > >> > + assert(job_num <= qpl->total_job_num); > >> > + /* submit all decompression jobs */ > >> > + for (int i = 0; i < job_num; i++) { > >> > + /* if the data size is the same as the page size, load it > >> directly */ > >> > + if (qpl->zbuf_hdr[i] == p->page_size) { > >> > + memcpy(p->host + p->normal[i], qpl->zbuf + off, p- > >> >page_size); > >> > + off += p->page_size; > >> > + continue; > >> > + } > >> > + job = qpl->job_array[i]; > >> > + multifd_qpl_prepare_job(job, false, qpl->zbuf + off, qpl- > >> >zbuf_hdr[i], > >> > + p->host + p->normal[i], p- > >page_size); > >> > + /* if hardware path(IAA) is unavailable, call the software > path > >> */ > >> > + if (!qpl->iaa_avail) { > >> > + status = qpl_execute_job(job); > >> > + if (status == QPL_STS_OK) { > >> > + off += qpl->zbuf_hdr[i]; > >> > + continue; > >> > + } > >> > + error_setg(errp, "multifd %u: qpl_execute_job failed > with > >> error %d", > >> > + p->id, status); > >> > + return -1; > >> > + } > >> > +retry: > >> > + status = qpl_submit_job(job); > >> > + if (status == QPL_STS_OK) { > >> > + off += qpl->zbuf_hdr[i]; > >> > + } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) { > >> > + goto retry; > >> > + } else { > >> > + error_setg(errp, "multifd %u: qpl_submit_job failed with > >> error %d", > >> > + p->id, status); > >> > + return -1; > >> > + } > >> > + } > >> > + if (!qpl->iaa_avail) { > >> > + goto done; > >> > + } > >> > + /* wait all jobs to complete for hardware(IAA) path */ > >> > + for (int i = 0; i < job_num; i++) { > >> > + if (qpl->zbuf_hdr[i] == p->page_size) { > >> > + continue; > >> > + } > >> > + job = qpl->job_array[i]; > >> > + status = qpl_wait_job(job); > >> > + if (status != QPL_STS_OK) { > >> > + error_setg(errp, "multifd %u: qpl_wait_job failed with > >> error %d", > >> > + p->id, status); > >> > + return -1; > >> > + } > >> > + if (job->total_out != p->page_size) { > >> > + error_setg(errp, "multifd %u: decompressed len %u, > expected > >> len %u", > >> > + p->id, job->total_out, p->page_size); > >> > + return -1; > >> > + } > >> > + } > >> > +done: > >> > + return 0; > >> > +} > >> > + > >> > /** > >> > * multifd_qpl_recv: read the data from the channel into actual > pages > >> > * > >> > @@ -269,8 +505,48 @@ static void > >> multifd_qpl_recv_cleanup(MultiFDRecvParams *p) > >> > */ > >> > static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp) > >> > { > >> > - /* Implement in next patch */ > >> > - return -1; > >> > + QplData *qpl = p->compress_data; > >> > + uint32_t in_size = p->next_packet_size; > >> > + uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; > >> > + uint32_t hdr_len = p->normal_num * sizeof(uint32_t); > >> > + uint32_t data_len = 0; > >> > + int ret; > >> > + > >> > + if (flags != MULTIFD_FLAG_QPL) { > >> > + error_setg(errp, "multifd %u: flags received %x flags > >> expected %x", > >> > + p->id, flags, MULTIFD_FLAG_QPL); > >> > + return -1; > >> > + } > >> > + multifd_recv_zero_page_process(p); > >> > + if (!p->normal_num) { > >> > + assert(in_size == 0); > >> > + return 0; > >> > + } > >> > + > >> > + /* read compressed data lengths */ > >> > + assert(hdr_len < in_size); > >> > + ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr, > hdr_len, > >> errp); > >> > + if (ret != 0) { > >> > + return ret; > >> > + } > >> > + assert(p->normal_num <= qpl->total_job_num); > >> > >> I'm still in doubt whether we should use p->page_count directly all > >> over. It's nice to move the concept into the QPL domain space, but it > >> makes less sense in these functions that take MultiFD*Params as > >> argument. > > > > I don't understand what you mean here. Do you plan to remove page_count > from MultiFD*Params > > and provide a new API to get the migrated page count? > > > > No, I mean that qpl->total_job_num == p->page_count, so we could use > p->page_count in the functions that have MultiFDParams available. Maybe > even drop total_job_num altogether. But I'm debating if it is worth it, > because that makes the code more coupled to multifd and we may not want > that. Let's leave it for now. Get it, thanks for the comment > >> > + for (int i = 0; i < p->normal_num; i++) { > >> > + qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]); > >> > + data_len += qpl->zbuf_hdr[i]; > >> > + assert(qpl->zbuf_hdr[i] <= p->page_size); > >> > + } > >> > + > >> > + /* read compressed data */ > >> > + assert(in_size == hdr_len + data_len); > >> > + ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len, > >> errp); > >> > + if (ret != 0) { > >> > + return ret; > >> > + } > >> > + > >> > + if (multifd_qpl_decompress_pages(p, errp) != 0) { > >> > + return -1; > >> > + } > >> > + return 0; > >> > } > >> > > >> > static MultiFDMethods multifd_qpl_ops = {