Hao Xiang <hao.xi...@bytedance.com> writes:

> * Create a dedicated thread for DSA task completion.
> * DSA completion thread runs a loop and poll for completed tasks.
> * Start and stop DSA completion thread during DSA device start stop.
>
> User space application can directly submit task to Intel DSA
> accelerator by writing to DSA's device memory (mapped in user space).
> Once a task is submitted, the device starts processing it and write
> the completion status back to the task. A user space application can
> poll the task's completion status to check for completion. This change
> uses a dedicated thread to perform DSA task completion checking.
>
> Signed-off-by: Hao Xiang <hao.xi...@bytedance.com>
> ---
>  util/dsa.c | 243 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 242 insertions(+), 1 deletion(-)
>
> diff --git a/util/dsa.c b/util/dsa.c
> index f82282ce99..0e68013ffb 100644
> --- a/util/dsa.c
> +++ b/util/dsa.c
> @@ -44,6 +44,7 @@
>  
>  #define DSA_WQ_SIZE 4096
>  #define MAX_DSA_DEVICES 16
> +#define DSA_COMPLETION_THREAD "dsa_completion"
>  
>  typedef QSIMPLEQ_HEAD(dsa_task_queue, buffer_zero_batch_task) dsa_task_queue;
>  
> @@ -61,8 +62,18 @@ struct dsa_device_group {
>      dsa_task_queue task_queue;
>  };
>  
> +struct dsa_completion_thread {
> +    bool stopping;
> +    bool running;
> +    QemuThread thread;
> +    int thread_id;
> +    QemuSemaphore sem_init_done;
> +    struct dsa_device_group *group;
> +};
> +
>  uint64_t max_retry_count;
>  static struct dsa_device_group dsa_group;
> +static struct dsa_completion_thread completion_thread;
>  
>  
>  /**
> @@ -439,6 +450,234 @@ submit_batch_wi_async(struct buffer_zero_batch_task 
> *batch_task)
>      return dsa_task_enqueue(device_group, batch_task);
>  }
>  
> +/**
> + * @brief Poll for the DSA work item completion.
> + *
> + * @param completion A pointer to the DSA work item completion record.
> + * @param opcode The DSA opcode.
> + *
> + * @return Zero if successful, non-zero otherwise.
> + */
> +static int
> +poll_completion(struct dsa_completion_record *completion,
> +                enum dsa_opcode opcode)
> +{
> +    uint8_t status;
> +    uint64_t retry = 0;
> +
> +    while (true) {
> +        // The DSA operation completes successfully or fails.
> +        status = completion->status;
> +        if (status == DSA_COMP_SUCCESS ||

Should we read directly from completion->status or is the compiler smart
enough to not optimize 'status' out?

> +            status == DSA_COMP_PAGE_FAULT_NOBOF ||
> +            status == DSA_COMP_BATCH_PAGE_FAULT ||
> +            status == DSA_COMP_BATCH_FAIL) {
> +            break;
> +        } else if (status != DSA_COMP_NONE) {
> +            /* TODO: Error handling here on unexpected failure. */

Let's make sure this is dealt with before merging.

> +            fprintf(stderr, "DSA opcode %d failed with status = %d.\n",
> +                    opcode, status);
> +            exit(1);

return instead of exiting.

> +        }
> +        retry++;
> +        if (retry > max_retry_count) {
> +            fprintf(stderr, "Wait for completion retry %lu times.\n", retry);
> +            exit(1);

same here

> +        }
> +        _mm_pause();
> +    }
> +
> +    return 0;
> +}
> +
> +/**
> + * @brief Complete a single DSA task in the batch task.
> + *
> + * @param task A pointer to the batch task structure.
> + */
> +static void
> +poll_task_completion(struct buffer_zero_batch_task *task)
> +{
> +    assert(task->task_type == DSA_TASK);
> +
> +    struct dsa_completion_record *completion = &task->completions[0];
> +    uint8_t status;
> +
> +    poll_completion(completion, task->descriptors[0].opcode);
> +
> +    status = completion->status;
> +    if (status == DSA_COMP_SUCCESS) {
> +        task->results[0] = (completion->result == 0);
> +        return;
> +    }
> +
> +    assert(status == DSA_COMP_PAGE_FAULT_NOBOF);
> +}
> +
> +/**
> + * @brief Poll a batch task status until it completes. If DSA task doesn't
> + *        complete properly, use CPU to complete the task.
> + *
> + * @param batch_task A pointer to the DSA batch task.
> + */
> +static void
> +poll_batch_task_completion(struct buffer_zero_batch_task *batch_task)
> +{
> +    struct dsa_completion_record *batch_completion = 
> &batch_task->batch_completion;
> +    struct dsa_completion_record *completion;
> +    uint8_t batch_status;
> +    uint8_t status;
> +    bool *results = batch_task->results;
> +    uint32_t count = batch_task->batch_descriptor.desc_count;
> +
> +    poll_completion(batch_completion,
> +                    batch_task->batch_descriptor.opcode);
> +
> +    batch_status = batch_completion->status;
> +
> +    if (batch_status == DSA_COMP_SUCCESS) {
> +        if (batch_completion->bytes_completed == count) {
> +            // Let's skip checking for each descriptors' completion status
> +            // if the batch descriptor says all succedded.
> +            for (int i = 0; i < count; i++) {
> +                assert(batch_task->completions[i].status == 
> DSA_COMP_SUCCESS);
> +                results[i] = (batch_task->completions[i].result == 0);
> +            }
> +            return;
> +        }
> +    } else {
> +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
> +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
> +    }
> +
> +    for (int i = 0; i < count; i++) {
> +

extra whitespace

> +        completion = &batch_task->completions[i];
> +        status = completion->status;
> +
> +        if (status == DSA_COMP_SUCCESS) {
> +            results[i] = (completion->result == 0);
> +            continue;
> +        }
> +
> +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> +            fprintf(stderr,
> +                    "Unexpected completion status = %u.\n", status);
> +            assert(false);

return here

> +        }
> +    }
> +}
> +
> +/**
> + * @brief Handles an asynchronous DSA batch task completion.
> + *
> + * @param task A pointer to the batch buffer zero task structure.
> + */
> +static void
> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> +{
> +    batch_task->status = DSA_TASK_COMPLETION;
> +    batch_task->completion_callback(batch_task);
> +}
> +
> +/**
> + * @brief The function entry point called by a dedicated DSA
> + *        work item completion thread.
> + *
> + * @param opaque A pointer to the thread context.
> + *
> + * @return void* Not used.
> + */
> +static void *
> +dsa_completion_loop(void *opaque)
> +{
> +    struct dsa_completion_thread *thread_context =
> +        (struct dsa_completion_thread *)opaque;
> +    struct buffer_zero_batch_task *batch_task;
> +    struct dsa_device_group *group = thread_context->group;
> +
> +    rcu_register_thread();
> +
> +    thread_context->thread_id = qemu_get_thread_id();
> +    qemu_sem_post(&thread_context->sem_init_done);
> +
> +    while (thread_context->running) {
> +        batch_task = dsa_task_dequeue(group);
> +        assert(batch_task != NULL || !group->running);
> +        if (!group->running) {
> +            assert(!thread_context->running);

This is racy if the compiler reorders "thread_context->running = false"
and "group->running = false". I'd put this under the task_queue_lock or
add a compiler barrier at dsa_completion_thread_stop().

> +            break;
> +        }
> +        if (batch_task->task_type == DSA_TASK) {
> +            poll_task_completion(batch_task);
> +        } else {
> +            assert(batch_task->task_type == DSA_BATCH_TASK);
> +            poll_batch_task_completion(batch_task);
> +        }
> +
> +        dsa_batch_task_complete(batch_task);
> +    }
> +
> +    rcu_unregister_thread();
> +    return NULL;
> +}
> +
> +/**
> + * @brief Initializes a DSA completion thread.
> + *
> + * @param completion_thread A pointer to the completion thread context.
> + * @param group A pointer to the DSA device group.
> + */
> +static void
> +dsa_completion_thread_init(
> +    struct dsa_completion_thread *completion_thread,
> +    struct dsa_device_group *group)
> +{
> +    completion_thread->stopping = false;
> +    completion_thread->running = true;
> +    completion_thread->thread_id = -1;
> +    qemu_sem_init(&completion_thread->sem_init_done, 0);
> +    completion_thread->group = group;
> +
> +    qemu_thread_create(&completion_thread->thread,
> +                       DSA_COMPLETION_THREAD,
> +                       dsa_completion_loop,
> +                       completion_thread,
> +                       QEMU_THREAD_JOINABLE);
> +
> +    /* Wait for initialization to complete */
> +    while (completion_thread->thread_id == -1) {
> +        qemu_sem_wait(&completion_thread->sem_init_done);
> +    }

This is racy, the thread can set 'thread_id' before this enters the loop
and the semaphore will be left unmatched. Not a huge deal but it might
cause confusion when debugging the initialization.

> +}
> +
> +/**
> + * @brief Stops the completion thread (and implicitly, the device group).
> + *
> + * @param opaque A pointer to the completion thread.
> + */
> +static void dsa_completion_thread_stop(void *opaque)
> +{
> +    struct dsa_completion_thread *thread_context =
> +        (struct dsa_completion_thread *)opaque;
> +
> +    struct dsa_device_group *group = thread_context->group;
> +
> +    qemu_mutex_lock(&group->task_queue_lock);
> +
> +    thread_context->stopping = true;
> +    thread_context->running = false;
> +
> +    dsa_device_group_stop(group);
> +
> +    qemu_cond_signal(&group->task_queue_cond);
> +    qemu_mutex_unlock(&group->task_queue_lock);
> +
> +    qemu_thread_join(&thread_context->thread);
> +
> +    qemu_sem_destroy(&thread_context->sem_init_done);
> +}
> +
>  /**
>   * @brief Check if DSA is running.
>   *
> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task 
> *batch_task)
>   */
>  bool dsa_is_running(void)
>  {
> -    return false;
> +    return completion_thread.running;
>  }
>  
>  static void
> @@ -481,6 +720,7 @@ void dsa_start(void)
>          return;
>      }
>      dsa_device_group_start(&dsa_group);
> +    dsa_completion_thread_init(&completion_thread, &dsa_group);
>  }
>  
>  /**
> @@ -496,6 +736,7 @@ void dsa_stop(void)
>          return;
>      }
>  
> +    dsa_completion_thread_stop(&completion_thread);
>      dsa_empty_task_queue(group);
>  }

Reply via email to