On Mon, Dec 18, 2023 at 5:34 PM Wang, Lei <lei4.w...@intel.com> wrote:
>
> On 12/19/2023 2:57, Hao Xiang wrote:> On Sun, Dec 17, 2023 at 7:11 PM Wang, 
> Lei
> <lei4.w...@intel.com> wrote:
> >>
> >> On 11/14/2023 13:40, Hao Xiang wrote:> * Create a dedicated thread for DSA 
> >> task
> >> completion.
> >>> * DSA completion thread runs a loop and poll for completed tasks.
> >>> * Start and stop DSA completion thread during DSA device start stop.
> >>>
> >>> User space application can directly submit task to Intel DSA
> >>> accelerator by writing to DSA's device memory (mapped in user space).
> >>
> >>> +            }
> >>> +            return;
> >>> +        }
> >>> +    } else {
> >>> +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
> >>> +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
> >>
> >> Nit: indentation is broken here.
> >>
> >>> +    }
> >>> +
> >>> +    for (int i = 0; i < count; i++) {
> >>> +
> >>> +        completion = &batch_task->completions[i];
> >>> +        status = completion->status;
> >>> +
> >>> +        if (status == DSA_COMP_SUCCESS) {
> >>> +            results[i] = (completion->result == 0);
> >>> +            continue;
> >>> +        }
> >>> +
> >>> +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> >>> +            fprintf(stderr,
> >>> +                    "Unexpected completion status = %u.\n", status);
> >>> +            assert(false);
> >>> +        }
> >>> +    }
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief Handles an asynchronous DSA batch task completion.
> >>> + *
> >>> + * @param task A pointer to the batch buffer zero task structure.
> >>> + */
> >>> +static void
> >>> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> >>> +{
> >>> +    batch_task->status = DSA_TASK_COMPLETION;
> >>> +    batch_task->completion_callback(batch_task);
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief The function entry point called by a dedicated DSA
> >>> + *        work item completion thread.
> >>> + *
> >>> + * @param opaque A pointer to the thread context.
> >>> + *
> >>> + * @return void* Not used.
> >>> + */
> >>> +static void *
> >>> +dsa_completion_loop(void *opaque)
> >>
> >> Per my understanding, if a multifd sending thread corresponds to a DSA 
> >> device,
> >> then the batch tasks are executed in parallel which means a task may be
> >> completed slower than another even if this task is enqueued earlier than 
> >> it. If
> >> we poll on the slower task first it will block the handling of the faster 
> >> one,
> >> even if the zero checking task for that thread is finished and it can go 
> >> ahead
> >> and send the data to the wire, this may lower the network resource 
> >> utilization.
> >>
> >
> > Hi Lei, thanks for reviewing. You are correct that we can keep pulling
> > a task enqueued first while others in the queue have already been
> > completed. In fact, only one DSA completion thread (pulling thread) is
> > used here even when multiple DSA devices are used. The pulling loop is
> > the most CPU intensive activity in the DSA workflow and that acts
> > directly against the goal of saving CPU usage. The trade-off I want to
> > take here is a slightly higher latency on DSA task completion but more
> > CPU savings. A single DSA engine can reach 30 GB/s throughput on
> > memory comparison operation. We use kernel tcp stack for network
> > transfer. The best I see is around 10GB/s throughput.  RDMA can
> > potentially go higher but I am not sure if it can go higher than 30
> > GB/s throughput anytime soon.
>
> Hi Hao, that makes sense, if the DSA is faster than the network, then a little
> bit of latency in DSA checking is tolerable. In the long term, I think the 
> best
> form of the DSA task checking thread is to use an fd or such sort of thing 
> that
> can multiplex the checking of different DSA devices, then we can serve the DSA
> task in the order they complete rather than FCFS.
>
I have experimented using N completion threads and each thread pulls
tasks submitted to a particular DSA device. That approach uses too
many CPU cycles. If Intel can come up with a better workflow for DSA
completion, there is definitely space for improvement here.
> >
> >>> +{
> >>> +    struct dsa_completion_thread *thread_context =
> >>> +        (struct dsa_completion_thread *)opaque;
> >>> +    struct buffer_zero_batch_task *batch_task;
> >>> +    struct dsa_device_group *group = thread_context->group;
> >>> +
> >>> +    rcu_register_thread();
> >>> +
> >>> +    thread_context->thread_id = qemu_get_thread_id();
> >>> +    qemu_sem_post(&thread_context->sem_init_done);
> >>> +
> >>> +    while (thread_context->running) {
> >>> +        batch_task = dsa_task_dequeue(group);
> >>> +        assert(batch_task != NULL || !group->running);
> >>> +        if (!group->running) {
> >>> +            assert(!thread_context->running);
> >>> +            break;
> >>> +        }
> >>> +        if (batch_task->task_type == DSA_TASK) {
> >>> +            poll_task_completion(batch_task);
> >>> +        } else {
> >>> +            assert(batch_task->task_type == DSA_BATCH_TASK);
> >>> +            poll_batch_task_completion(batch_task);
> >>> +        }
> >>> +
> >>> +        dsa_batch_task_complete(batch_task);
> >>> +    }
> >>> +
> >>> +    rcu_unregister_thread();
> >>> +    return NULL;
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief Initializes a DSA completion thread.
> >>> + *
> >>> + * @param completion_thread A pointer to the completion thread context.
> >>> + * @param group A pointer to the DSA device group.
> >>> + */
> >>> +static void
> >>> +dsa_completion_thread_init(
> >>> +    struct dsa_completion_thread *completion_thread,
> >>> +    struct dsa_device_group *group)
> >>> +{
> >>> +    completion_thread->stopping = false;
> >>> +    completion_thread->running = true;
> >>> +    completion_thread->thread_id = -1;
> >>> +    qemu_sem_init(&completion_thread->sem_init_done, 0);
> >>> +    completion_thread->group = group;
> >>> +
> >>> +    qemu_thread_create(&completion_thread->thread,
> >>> +                       DSA_COMPLETION_THREAD,
> >>> +                       dsa_completion_loop,
> >>> +                       completion_thread,
> >>> +                       QEMU_THREAD_JOINABLE);
> >>> +
> >>> +    /* Wait for initialization to complete */
> >>> +    while (completion_thread->thread_id == -1) {
> >>> +        qemu_sem_wait(&completion_thread->sem_init_done);
> >>> +    }
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief Stops the completion thread (and implicitly, the device group).
> >>> + *
> >>> + * @param opaque A pointer to the completion thread.
> >>> + */
> >>> +static void dsa_completion_thread_stop(void *opaque)
> >>> +{
> >>> +    struct dsa_completion_thread *thread_context =
> >>> +        (struct dsa_completion_thread *)opaque;
> >>> +
> >>> +    struct dsa_device_group *group = thread_context->group;
> >>> +
> >>> +    qemu_mutex_lock(&group->task_queue_lock);
> >>> +
> >>> +    thread_context->stopping = true;
> >>> +    thread_context->running = false;
> >>> +
> >>> +    dsa_device_group_stop(group);
> >>> +
> >>> +    qemu_cond_signal(&group->task_queue_cond);
> >>> +    qemu_mutex_unlock(&group->task_queue_lock);
> >>> +
> >>> +    qemu_thread_join(&thread_context->thread);
> >>> +
> >>> +    qemu_sem_destroy(&thread_context->sem_init_done);
> >>> +}
> >>> +
> >>>  /**
> >>>   * @brief Check if DSA is running.
> >>>   *
> >>> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task 
> >>> *batch_task)
> >>>   */
> >>>  bool dsa_is_running(void)
> >>>  {
> >>> -    return false;
> >>> +    return completion_thread.running;
> >>>  }
> >>>
> >>>  static void
> >>> @@ -481,6 +720,7 @@ void dsa_start(void)
> >>>          return;
> >>>      }
> >>>      dsa_device_group_start(&dsa_group);
> >>> +    dsa_completion_thread_init(&completion_thread, &dsa_group);
> >>>  }
> >>>
> >>>  /**
> >>> @@ -496,6 +736,7 @@ void dsa_stop(void)
> >>>          return;
> >>>      }
> >>>
> >>> +    dsa_completion_thread_stop(&completion_thread);
> >>>      dsa_empty_task_queue(group);
> >>>  }
> >>>

Reply via email to