On 11/28/18 6:59 PM, Marek Olšák wrote: > From: Marek Olšák <marek.ol...@amd.com> > > for ARB_parallel_shader_compile > --- > src/util/u_queue.c | 49 +++++++++++++++++++++++++++++----------------- > src/util/u_queue.h | 5 ++--- > 2 files changed, 33 insertions(+), 21 deletions(-) > > diff --git a/src/util/u_queue.c b/src/util/u_queue.c > index 48c5c79552d..5aaf60ae78e 100644 > --- a/src/util/u_queue.c > +++ b/src/util/u_queue.c > @@ -26,42 +26,43 @@ > > #include "u_queue.h" > > #include <time.h> > > #include "util/os_time.h" > #include "util/u_string.h" > #include "util/u_thread.h" > #include "u_process.h" > > -static void util_queue_killall_and_wait(struct util_queue *queue); > +static void > +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads); > > /**************************************************************************** > * Wait for all queues to assert idle when exit() is called. > * > * Otherwise, C++ static variable destructors can be called while threads > * are using the static variables. > */ > > static once_flag atexit_once_flag = ONCE_FLAG_INIT; > static struct list_head queue_list; > static mtx_t exit_mutex = _MTX_INITIALIZER_NP; > > static void > atexit_handler(void) > { > struct util_queue *iter; > > mtx_lock(&exit_mutex); > /* Wait for all queues to assert idle. */ > LIST_FOR_EACH_ENTRY(iter, &queue_list, head) { > - util_queue_killall_and_wait(iter); > + util_queue_kill_threads(iter, 0); > } > mtx_unlock(&exit_mutex); > } > > static void > global_init(void) > { > LIST_INITHEAD(&queue_list); > atexit(atexit_handler); > } > @@ -259,55 +260,58 @@ util_queue_thread_func(void *input) > u_thread_setname(name); > } > > while (1) { > struct util_queue_job job; > > mtx_lock(&queue->lock); > assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); > > /* wait if the queue is empty */ > - while (!queue->kill_threads && queue->num_queued == 0) > + while (thread_index < queue->num_threads && queue->num_queued == 0) > cnd_wait(&queue->has_queued_cond, &queue->lock); > > - if (queue->kill_threads) { > + /* only kill threads that are above "num_threads" */ > + if (thread_index >= queue->num_threads) { > mtx_unlock(&queue->lock); > break; > } > > job = queue->jobs[queue->read_idx]; > memset(&queue->jobs[queue->read_idx], 0, sizeof(struct > util_queue_job)); > queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; > > queue->num_queued--; > cnd_signal(&queue->has_space_cond); > mtx_unlock(&queue->lock); > > if (job.job) { > job.execute(job.job, thread_index); > util_queue_fence_signal(job.fence); > if (job.cleanup) > job.cleanup(job.job, thread_index); > } > } > > - /* signal remaining jobs before terminating */ > + /* signal remaining jobs if all threads are being terminated */ > mtx_lock(&queue->lock); > - for (unsigned i = queue->read_idx; i != queue->write_idx; > - i = (i + 1) % queue->max_jobs) { > - if (queue->jobs[i].job) { > - util_queue_fence_signal(queue->jobs[i].fence); > - queue->jobs[i].job = NULL; > + if (queue->num_threads == 0) { > + for (unsigned i = queue->read_idx; i != queue->write_idx; > + i = (i + 1) % queue->max_jobs) { > + if (queue->jobs[i].job) { > + util_queue_fence_signal(queue->jobs[i].fence); > + queue->jobs[i].job = NULL; > + } > } > + queue->read_idx = queue->write_idx; > + queue->num_queued = 0; > } > - queue->read_idx = queue->write_idx; > - queue->num_queued = 0; > mtx_unlock(&queue->lock); > return 0; > } > > static bool > util_queue_create_thread(struct util_queue *queue, unsigned index) > { > struct thread_input *input = > (struct thread_input *) malloc(sizeof(struct thread_input)); > input->queue = queue; > @@ -418,60 +422,69 @@ fail: > cnd_destroy(&queue->has_queued_cond); > mtx_destroy(&queue->lock); > free(queue->jobs); > } > /* also util_queue_is_initialized can be used to check for success */ > memset(queue, 0, sizeof(*queue)); > return false; > } > > static void > -util_queue_killall_and_wait(struct util_queue *queue) > +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads) > { > unsigned i; > > /* Signal all threads to terminate. */ > + mtx_lock(&queue->finish_lock); > + > + if (keep_num_threads >= queue->num_threads) { > + mtx_unlock(&queue->finish_lock); > + return; > + } > + > mtx_lock(&queue->lock); > - queue->kill_threads = 1; > + unsigned old_num_threads = queue->num_threads; > + queue->num_threads = keep_num_threads;
Shouldn't this still be set below, after the threads are joined? > cnd_broadcast(&queue->has_queued_cond); > mtx_unlock(&queue->lock); > > - for (i = 0; i < queue->num_threads; i++) > + for (i = keep_num_threads; i < old_num_threads; i++) > thrd_join(queue->threads[i], NULL); > - queue->num_threads = 0; > + > + mtx_unlock(&queue->finish_lock); > } > > void > util_queue_destroy(struct util_queue *queue) > { > - util_queue_killall_and_wait(queue); > + util_queue_kill_threads(queue, 0); > remove_from_atexit_list(queue); > > cnd_destroy(&queue->has_space_cond); > cnd_destroy(&queue->has_queued_cond); > mtx_destroy(&queue->finish_lock); > mtx_destroy(&queue->lock); > free(queue->jobs); > free(queue->threads); > } > > void > util_queue_add_job(struct util_queue *queue, > void *job, > struct util_queue_fence *fence, > util_queue_execute_func execute, > util_queue_execute_func cleanup) > { > struct util_queue_job *ptr; > > mtx_lock(&queue->lock); > - if (queue->kill_threads) { > + if (queue->num_threads == 0) { > mtx_unlock(&queue->lock); > /* well no good option here, but any leaks will be > * short-lived as things are shutting down.. > */ > return; > } > > util_queue_fence_reset(fence); > > assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); > diff --git a/src/util/u_queue.h b/src/util/u_queue.h > index 4e63a76aab2..756fa53e1bf 100644 > --- a/src/util/u_queue.h > +++ b/src/util/u_queue.h > @@ -194,29 +194,28 @@ typedef void (*util_queue_execute_func)(void *job, int > thread_index); > struct util_queue_job { > void *job; > struct util_queue_fence *fence; > util_queue_execute_func execute; > util_queue_execute_func cleanup; > }; > > /* Put this into your context. */ > struct util_queue { > char name[14]; /* 13 characters = the thread name without the index */ > - mtx_t finish_lock; /* only for util_queue_finish */ > + mtx_t finish_lock; /* for util_queue_finish and protects > threads/num_threads */ > mtx_t lock; > cnd_t has_queued_cond; > cnd_t has_space_cond; > thrd_t *threads; > unsigned flags; > int num_queued; > - unsigned num_threads; > - int kill_threads; > + unsigned num_threads; /* decreasing this number will terminate threads */ > int max_jobs; > int write_idx, read_idx; /* ring buffer pointers */ > struct util_queue_job *jobs; > > /* for cleanup at exit(), protected by exit_mutex */ > struct list_head head; > }; > > bool util_queue_init(struct util_queue *queue, > const char *name, > _______________________________________________ mesa-dev mailing list mesa-dev@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/mesa-dev