Hao Xiang <hao.xi...@bytedance.com> writes: > * Create a dedicated thread for DSA task completion. > * DSA completion thread runs a loop and poll for completed tasks. > * Start and stop DSA completion thread during DSA device start stop. > > User space application can directly submit task to Intel DSA > accelerator by writing to DSA's device memory (mapped in user space). > Once a task is submitted, the device starts processing it and write > the completion status back to the task. A user space application can > poll the task's completion status to check for completion. This change > uses a dedicated thread to perform DSA task completion checking. > > Signed-off-by: Hao Xiang <hao.xi...@bytedance.com> > --- > util/dsa.c | 243 ++++++++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 242 insertions(+), 1 deletion(-) > > diff --git a/util/dsa.c b/util/dsa.c > index f82282ce99..0e68013ffb 100644 > --- a/util/dsa.c > +++ b/util/dsa.c > @@ -44,6 +44,7 @@ > > #define DSA_WQ_SIZE 4096 > #define MAX_DSA_DEVICES 16 > +#define DSA_COMPLETION_THREAD "dsa_completion" > > typedef QSIMPLEQ_HEAD(dsa_task_queue, buffer_zero_batch_task) dsa_task_queue; > > @@ -61,8 +62,18 @@ struct dsa_device_group { > dsa_task_queue task_queue; > }; > > +struct dsa_completion_thread { > + bool stopping; > + bool running; > + QemuThread thread; > + int thread_id; > + QemuSemaphore sem_init_done; > + struct dsa_device_group *group; > +}; > + > uint64_t max_retry_count; > static struct dsa_device_group dsa_group; > +static struct dsa_completion_thread completion_thread; > > > /** > @@ -439,6 +450,234 @@ submit_batch_wi_async(struct buffer_zero_batch_task > *batch_task) > return dsa_task_enqueue(device_group, batch_task); > } > > +/** > + * @brief Poll for the DSA work item completion. > + * > + * @param completion A pointer to the DSA work item completion record. > + * @param opcode The DSA opcode. > + * > + * @return Zero if successful, non-zero otherwise. > + */ > +static int > +poll_completion(struct dsa_completion_record *completion, > + enum dsa_opcode opcode) > +{ > + uint8_t status; > + uint64_t retry = 0; > + > + while (true) { > + // The DSA operation completes successfully or fails. > + status = completion->status; > + if (status == DSA_COMP_SUCCESS ||
Should we read directly from completion->status or is the compiler smart enough to not optimize 'status' out? > + status == DSA_COMP_PAGE_FAULT_NOBOF || > + status == DSA_COMP_BATCH_PAGE_FAULT || > + status == DSA_COMP_BATCH_FAIL) { > + break; > + } else if (status != DSA_COMP_NONE) { > + /* TODO: Error handling here on unexpected failure. */ Let's make sure this is dealt with before merging. > + fprintf(stderr, "DSA opcode %d failed with status = %d.\n", > + opcode, status); > + exit(1); return instead of exiting. > + } > + retry++; > + if (retry > max_retry_count) { > + fprintf(stderr, "Wait for completion retry %lu times.\n", retry); > + exit(1); same here > + } > + _mm_pause(); > + } > + > + return 0; > +} > + > +/** > + * @brief Complete a single DSA task in the batch task. > + * > + * @param task A pointer to the batch task structure. > + */ > +static void > +poll_task_completion(struct buffer_zero_batch_task *task) > +{ > + assert(task->task_type == DSA_TASK); > + > + struct dsa_completion_record *completion = &task->completions[0]; > + uint8_t status; > + > + poll_completion(completion, task->descriptors[0].opcode); > + > + status = completion->status; > + if (status == DSA_COMP_SUCCESS) { > + task->results[0] = (completion->result == 0); > + return; > + } > + > + assert(status == DSA_COMP_PAGE_FAULT_NOBOF); > +} > + > +/** > + * @brief Poll a batch task status until it completes. If DSA task doesn't > + * complete properly, use CPU to complete the task. > + * > + * @param batch_task A pointer to the DSA batch task. > + */ > +static void > +poll_batch_task_completion(struct buffer_zero_batch_task *batch_task) > +{ > + struct dsa_completion_record *batch_completion = > &batch_task->batch_completion; > + struct dsa_completion_record *completion; > + uint8_t batch_status; > + uint8_t status; > + bool *results = batch_task->results; > + uint32_t count = batch_task->batch_descriptor.desc_count; > + > + poll_completion(batch_completion, > + batch_task->batch_descriptor.opcode); > + > + batch_status = batch_completion->status; > + > + if (batch_status == DSA_COMP_SUCCESS) { > + if (batch_completion->bytes_completed == count) { > + // Let's skip checking for each descriptors' completion status > + // if the batch descriptor says all succedded. > + for (int i = 0; i < count; i++) { > + assert(batch_task->completions[i].status == > DSA_COMP_SUCCESS); > + results[i] = (batch_task->completions[i].result == 0); > + } > + return; > + } > + } else { > + assert(batch_status == DSA_COMP_BATCH_FAIL || > + batch_status == DSA_COMP_BATCH_PAGE_FAULT); > + } > + > + for (int i = 0; i < count; i++) { > + extra whitespace > + completion = &batch_task->completions[i]; > + status = completion->status; > + > + if (status == DSA_COMP_SUCCESS) { > + results[i] = (completion->result == 0); > + continue; > + } > + > + if (status != DSA_COMP_PAGE_FAULT_NOBOF) { > + fprintf(stderr, > + "Unexpected completion status = %u.\n", status); > + assert(false); return here > + } > + } > +} > + > +/** > + * @brief Handles an asynchronous DSA batch task completion. > + * > + * @param task A pointer to the batch buffer zero task structure. > + */ > +static void > +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task) > +{ > + batch_task->status = DSA_TASK_COMPLETION; > + batch_task->completion_callback(batch_task); > +} > + > +/** > + * @brief The function entry point called by a dedicated DSA > + * work item completion thread. > + * > + * @param opaque A pointer to the thread context. > + * > + * @return void* Not used. > + */ > +static void * > +dsa_completion_loop(void *opaque) > +{ > + struct dsa_completion_thread *thread_context = > + (struct dsa_completion_thread *)opaque; > + struct buffer_zero_batch_task *batch_task; > + struct dsa_device_group *group = thread_context->group; > + > + rcu_register_thread(); > + > + thread_context->thread_id = qemu_get_thread_id(); > + qemu_sem_post(&thread_context->sem_init_done); > + > + while (thread_context->running) { > + batch_task = dsa_task_dequeue(group); > + assert(batch_task != NULL || !group->running); > + if (!group->running) { > + assert(!thread_context->running); This is racy if the compiler reorders "thread_context->running = false" and "group->running = false". I'd put this under the task_queue_lock or add a compiler barrier at dsa_completion_thread_stop(). > + break; > + } > + if (batch_task->task_type == DSA_TASK) { > + poll_task_completion(batch_task); > + } else { > + assert(batch_task->task_type == DSA_BATCH_TASK); > + poll_batch_task_completion(batch_task); > + } > + > + dsa_batch_task_complete(batch_task); > + } > + > + rcu_unregister_thread(); > + return NULL; > +} > + > +/** > + * @brief Initializes a DSA completion thread. > + * > + * @param completion_thread A pointer to the completion thread context. > + * @param group A pointer to the DSA device group. > + */ > +static void > +dsa_completion_thread_init( > + struct dsa_completion_thread *completion_thread, > + struct dsa_device_group *group) > +{ > + completion_thread->stopping = false; > + completion_thread->running = true; > + completion_thread->thread_id = -1; > + qemu_sem_init(&completion_thread->sem_init_done, 0); > + completion_thread->group = group; > + > + qemu_thread_create(&completion_thread->thread, > + DSA_COMPLETION_THREAD, > + dsa_completion_loop, > + completion_thread, > + QEMU_THREAD_JOINABLE); > + > + /* Wait for initialization to complete */ > + while (completion_thread->thread_id == -1) { > + qemu_sem_wait(&completion_thread->sem_init_done); > + } This is racy, the thread can set 'thread_id' before this enters the loop and the semaphore will be left unmatched. Not a huge deal but it might cause confusion when debugging the initialization. > +} > + > +/** > + * @brief Stops the completion thread (and implicitly, the device group). > + * > + * @param opaque A pointer to the completion thread. > + */ > +static void dsa_completion_thread_stop(void *opaque) > +{ > + struct dsa_completion_thread *thread_context = > + (struct dsa_completion_thread *)opaque; > + > + struct dsa_device_group *group = thread_context->group; > + > + qemu_mutex_lock(&group->task_queue_lock); > + > + thread_context->stopping = true; > + thread_context->running = false; > + > + dsa_device_group_stop(group); > + > + qemu_cond_signal(&group->task_queue_cond); > + qemu_mutex_unlock(&group->task_queue_lock); > + > + qemu_thread_join(&thread_context->thread); > + > + qemu_sem_destroy(&thread_context->sem_init_done); > +} > + > /** > * @brief Check if DSA is running. > * > @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task > *batch_task) > */ > bool dsa_is_running(void) > { > - return false; > + return completion_thread.running; > } > > static void > @@ -481,6 +720,7 @@ void dsa_start(void) > return; > } > dsa_device_group_start(&dsa_group); > + dsa_completion_thread_init(&completion_thread, &dsa_group); > } > > /** > @@ -496,6 +736,7 @@ void dsa_stop(void) > return; > } > > + dsa_completion_thread_stop(&completion_thread); > dsa_empty_task_queue(group); > }