Avoid pthread_cond_broadcast that wakes up all workers. Make each of them
uses distict mutex/cond. Also let main thread help running jobs.
Similar to 'avfilter/pthread: rewrite implementation'
Benchmark on x86_64 with 4 cpus (2 cores x 2 hyperthread)
./ffmpeg -threads $threads -thread_type slice -i 10slices.mp4 -f rawvideo -y
/dev/null
threads=2:
old: 1m21.492s
new: 1m11.075s
threads=3:
old: 1m12.554s
new: 1m11.771s
threads=4:
old: 1m8.915s
new: 1m4.194s
threads=5:
old: 1m2.417s
new: 57.524s
threads=6:
old: 1m6.710s
new: 1m0.731s
threads=7:
old: 1m5.217s
new: 1m2.166s
threads=8:
old: 1m6.974s
new: 1m2.431s
threads=9:
old: 59.830s
new: 58.162s
threads=10:
old: 58.711s
new: 55.859s
Signed-off-by: Muhammad Faiz
---
libavcodec/pthread_slice.c | 217 -
1 file changed, 138 insertions(+), 79 deletions(-)
diff --git a/libavcodec/pthread_slice.c b/libavcodec/pthread_slice.c
index 60f5b78..a290dab 100644
--- a/libavcodec/pthread_slice.c
+++ b/libavcodec/pthread_slice.c
@@ -22,6 +22,7 @@
* @see doc/multithreading.txt
*/
+#include
#include "config.h"
#include "avcodec.h"
@@ -38,21 +39,25 @@
typedef int (action_func)(AVCodecContext *c, void *arg);
typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int
threadnr);
+typedef struct WorkerContext WorkerContext;
+
typedef struct SliceThreadContext {
-pthread_t *workers;
+AVCodecContext *avctx;
+WorkerContext *workers;
action_func *func;
action_func2 *func2;
void *args;
int *rets;
-int job_count;
-int job_size;
-
-pthread_cond_t last_job_cond;
-pthread_cond_t current_job_cond;
-pthread_mutex_t current_job_lock;
-unsigned current_execute;
-int current_job;
+unsigned job_count;
+unsigned job_size;
+
+pthread_mutex_t mutex_user;
+pthread_mutex_t mutex_done;
+pthread_cond_t cond_done;
+atomic_uint current_job;
+atomic_uint nb_finished_jobs;
int done;
+int worker_done;
int *entries;
int entries_count;
@@ -61,68 +66,93 @@ typedef struct SliceThreadContext {
pthread_mutex_t *progress_mutex;
} SliceThreadContext;
-static void* attribute_align_arg worker(void *v)
+struct WorkerContext {
+SliceThreadContext *ctx;
+pthread_t thread;
+pthread_mutex_t mutex;
+pthread_cond_t cond;
+int done;
+};
+
+static unsigned run_jobs(SliceThreadContext *c)
{
-AVCodecContext *avctx = v;
-SliceThreadContext *c = avctx->internal->thread_ctx;
-unsigned last_execute = 0;
-int our_job = c->job_count;
-int thread_count = avctx->thread_count;
-int self_id;
-
-pthread_mutex_lock(&c->current_job_lock);
-self_id = c->current_job++;
-for (;;){
-int ret;
-while (our_job >= c->job_count) {
-if (c->current_job == thread_count + c->job_count)
-pthread_cond_signal(&c->last_job_cond);
-
-while (last_execute == c->current_execute && !c->done)
-pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
-last_execute = c->current_execute;
-our_job = self_id;
-
-if (c->done) {
-pthread_mutex_unlock(&c->current_job_lock);
-return NULL;
-}
-}
-pthread_mutex_unlock(&c->current_job_lock);
+unsigned current_job, nb_finished_jobs = 0;
+int thread_count = c->avctx->thread_count;
-ret = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
-c->func2(avctx, c->args, our_job, self_id);
+while (nb_finished_jobs != c->job_count &&
+ (current_job = atomic_fetch_add_explicit(&c->current_job, 1,
memory_order_acq_rel)) < c->job_count) {
+int ret = c->func ? c->func(c->avctx, (char *)c->args + current_job *
(size_t) c->job_size)
+ : c->func2(c->avctx, c->args, current_job,
current_job % thread_count);
if (c->rets)
-c->rets[our_job%c->job_count] = ret;
+c->rets[current_job] = ret;
+nb_finished_jobs = atomic_fetch_add_explicit(&c->nb_finished_jobs, 1,
memory_order_acq_rel) + 1;
+}
+
+return nb_finished_jobs;
+}
+
+static void* attribute_align_arg worker(void *v)
+{
+WorkerContext *w = v;
+SliceThreadContext *c = w->ctx;
+
+pthread_mutex_lock(&w->mutex);
+pthread_cond_signal(&w->cond);
+
+while (1) {
+w->done = 1;
+while (w->done)
+pthread_cond_wait(&w->cond, &w->mutex);
+
+if (c->done) {
+pthread_mutex_unlock(&w->mutex);
+return NULL;
+}
-pthread_mutex_lock(&c->current_job_lock);
-our_job = c->current_job++;
+if (run_jobs(c) == c->job_count) {
+pthread_mutex_lock(&c->mutex_done);
+c->worker_done = 1;
+pthread_cond_signal(&c->cond_done);
+pthread_mut