[FFmpeg-cvslog] fftools/ffmpeg_sched: make sure to always run task cleanup

2024-03-29 Thread Anton Khirnov
ffmpeg | branch: release/7.0 | Anton Khirnov  | Wed Mar 27 
09:49:27 2024 +0100| [536443919f59822670e74d40c5aa1baa0b32fec0] | committer: 
Anton Khirnov

fftools/ffmpeg_sched: make sure to always run task cleanup

Even in cases where sch_start() failed. This ensures all links are
properly closed and no tasks are left hanging.

Fixes #10916.

(cherry picked from commit 24b9f29ff2e0b84ae1345f51cbf7240e079d7a2b)
Signed-off-by: Anton Khirnov 

> http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=536443919f59822670e74d40c5aa1baa0b32fec0
---

 fftools/ffmpeg_sched.c | 68 --
 1 file changed, 44 insertions(+), 24 deletions(-)

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 67c32fb5a0..ee3af45908 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -260,6 +260,12 @@ typedef struct SchFilterGraph {
 int task_exited;
 } SchFilterGraph;
 
+enum SchedulerState {
+SCH_STATE_UNINIT,
+SCH_STATE_STARTED,
+SCH_STATE_STOPPED,
+};
+
 struct Scheduler {
 const AVClass  *class;
 
@@ -292,7 +298,7 @@ struct Scheduler {
 char   *sdp_filename;
 int sdp_auto;
 
-int transcode_started;
+enum SchedulerState state;
 atomic_int  terminate;
 atomic_int  task_failed;
 
@@ -1144,7 +1150,8 @@ int sch_mux_stream_ready(Scheduler *sch, unsigned 
mux_idx, unsigned stream_idx)
 
 // this may be called during initialization - do not start
 // threads before sch_start() is called
-if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started)
+if (++mux->nb_streams_ready == mux->nb_streams &&
+sch->state >= SCH_STATE_STARTED)
 ret = mux_init(sch, mux);
 
 pthread_mutex_unlock(&sch->mux_ready_lock);
@@ -1514,7 +1521,8 @@ int sch_start(Scheduler *sch)
 if (ret < 0)
 return ret;
 
-sch->transcode_started = 1;
+av_assert0(sch->state == SCH_STATE_UNINIT);
+sch->state = SCH_STATE_STARTED;
 
 for (unsigned i = 0; i < sch->nb_mux; i++) {
 SchMux *mux = &sch->mux[i];
@@ -1522,7 +1530,7 @@ int sch_start(Scheduler *sch)
 if (mux->nb_streams_ready == mux->nb_streams) {
 ret = mux_init(sch, mux);
 if (ret < 0)
-return ret;
+goto fail;
 }
 }
 
@@ -1531,7 +1539,7 @@ int sch_start(Scheduler *sch)
 
 ret = task_start(&enc->task);
 if (ret < 0)
-return ret;
+goto fail;
 }
 
 for (unsigned i = 0; i < sch->nb_filters; i++) {
@@ -1539,7 +1547,7 @@ int sch_start(Scheduler *sch)
 
 ret = task_start(&fg->task);
 if (ret < 0)
-return ret;
+goto fail;
 }
 
 for (unsigned i = 0; i < sch->nb_dec; i++) {
@@ -1547,7 +1555,7 @@ int sch_start(Scheduler *sch)
 
 ret = task_start(&dec->task);
 if (ret < 0)
-return ret;
+goto fail;
 }
 
 for (unsigned i = 0; i < sch->nb_demux; i++) {
@@ -1558,7 +1566,7 @@ int sch_start(Scheduler *sch)
 
 ret = task_start(&d->task);
 if (ret < 0)
-return ret;
+goto fail;
 }
 
 pthread_mutex_lock(&sch->schedule_lock);
@@ -1566,6 +1574,9 @@ int sch_start(Scheduler *sch)
 pthread_mutex_unlock(&sch->schedule_lock);
 
 return 0;
+fail:
+sch_stop(sch, NULL);
+return ret;
 }
 
 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
@@ -2414,6 +2425,18 @@ int sch_filter_command(Scheduler *sch, unsigned fg_idx, 
AVFrame *frame)
 return send_to_filter(sch, fg, fg->nb_inputs, frame);
 }
 
+static int task_cleanup(Scheduler *sch, SchedulerNode node)
+{
+switch (node.type) {
+case SCH_NODE_TYPE_DEMUX:   return demux_done (sch, node.idx);
+case SCH_NODE_TYPE_MUX: return mux_done   (sch, node.idx);
+case SCH_NODE_TYPE_DEC: return dec_done   (sch, node.idx);
+case SCH_NODE_TYPE_ENC: return enc_done   (sch, node.idx);
+case SCH_NODE_TYPE_FILTER_IN:   return filter_done(sch, node.idx);
+default: av_assert0(0);
+}
+}
+
 static void *task_wrapper(void *arg)
 {
 SchTask  *task = arg;
@@ -2426,15 +2449,7 @@ static void *task_wrapper(void *arg)
 av_log(task->func_arg, AV_LOG_ERROR,
"Task finished with error code: %d (%s)\n", ret, 
av_err2str(ret));
 
-switch (task->node.type) {
-case SCH_NODE_TYPE_DEMUX:   err = demux_done (sch, task->node.idx); 
break;
-case SCH_NODE_TYPE_MUX: err = mux_done   (sch, task->node.idx); 
break;
-case SCH_NODE_TYPE_DEC: err = dec_done   (sch, task->node.idx); 
break;
-case SCH_NODE_TYPE_ENC: err = enc_done   (sch, task->node.idx); 
break;
-case SCH_NODE_TYPE_FILTER_IN:   err = filter_done(sch, task->node.idx); 
break;
-default: av_assert0(0);
-}
-
+err = task_cleanup(sch, task->node);
 ret = err_me

[FFmpeg-cvslog] fftools/ffmpeg_sched: make sure to always run task cleanup

2024-03-29 Thread Anton Khirnov
ffmpeg | branch: master | Anton Khirnov  | Wed Mar 27 
09:49:27 2024 +0100| [24b9f29ff2e0b84ae1345f51cbf7240e079d7a2b] | committer: 
Anton Khirnov

fftools/ffmpeg_sched: make sure to always run task cleanup

Even in cases where sch_start() failed. This ensures all links are
properly closed and no tasks are left hanging.

Fixes #10916.

> http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=24b9f29ff2e0b84ae1345f51cbf7240e079d7a2b
---

 fftools/ffmpeg_sched.c | 68 --
 1 file changed, 44 insertions(+), 24 deletions(-)

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 422c665c96..f8485db30b 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -260,6 +260,12 @@ typedef struct SchFilterGraph {
 int task_exited;
 } SchFilterGraph;
 
+enum SchedulerState {
+SCH_STATE_UNINIT,
+SCH_STATE_STARTED,
+SCH_STATE_STOPPED,
+};
+
 struct Scheduler {
 const AVClass  *class;
 
@@ -292,7 +298,7 @@ struct Scheduler {
 char   *sdp_filename;
 int sdp_auto;
 
-int transcode_started;
+enum SchedulerState state;
 atomic_int  terminate;
 atomic_int  task_failed;
 
@@ -1144,7 +1150,8 @@ int sch_mux_stream_ready(Scheduler *sch, unsigned 
mux_idx, unsigned stream_idx)
 
 // this may be called during initialization - do not start
 // threads before sch_start() is called
-if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started)
+if (++mux->nb_streams_ready == mux->nb_streams &&
+sch->state >= SCH_STATE_STARTED)
 ret = mux_init(sch, mux);
 
 pthread_mutex_unlock(&sch->mux_ready_lock);
@@ -1514,7 +1521,8 @@ int sch_start(Scheduler *sch)
 if (ret < 0)
 return ret;
 
-sch->transcode_started = 1;
+av_assert0(sch->state == SCH_STATE_UNINIT);
+sch->state = SCH_STATE_STARTED;
 
 for (unsigned i = 0; i < sch->nb_mux; i++) {
 SchMux *mux = &sch->mux[i];
@@ -1522,7 +1530,7 @@ int sch_start(Scheduler *sch)
 if (mux->nb_streams_ready == mux->nb_streams) {
 ret = mux_init(sch, mux);
 if (ret < 0)
-return ret;
+goto fail;
 }
 }
 
@@ -1531,7 +1539,7 @@ int sch_start(Scheduler *sch)
 
 ret = task_start(&enc->task);
 if (ret < 0)
-return ret;
+goto fail;
 }
 
 for (unsigned i = 0; i < sch->nb_filters; i++) {
@@ -1539,7 +1547,7 @@ int sch_start(Scheduler *sch)
 
 ret = task_start(&fg->task);
 if (ret < 0)
-return ret;
+goto fail;
 }
 
 for (unsigned i = 0; i < sch->nb_dec; i++) {
@@ -1547,7 +1555,7 @@ int sch_start(Scheduler *sch)
 
 ret = task_start(&dec->task);
 if (ret < 0)
-return ret;
+goto fail;
 }
 
 for (unsigned i = 0; i < sch->nb_demux; i++) {
@@ -1558,7 +1566,7 @@ int sch_start(Scheduler *sch)
 
 ret = task_start(&d->task);
 if (ret < 0)
-return ret;
+goto fail;
 }
 
 pthread_mutex_lock(&sch->schedule_lock);
@@ -1566,6 +1574,9 @@ int sch_start(Scheduler *sch)
 pthread_mutex_unlock(&sch->schedule_lock);
 
 return 0;
+fail:
+sch_stop(sch, NULL);
+return ret;
 }
 
 int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
@@ -2412,6 +2423,18 @@ int sch_filter_command(Scheduler *sch, unsigned fg_idx, 
AVFrame *frame)
 return send_to_filter(sch, fg, fg->nb_inputs, frame);
 }
 
+static int task_cleanup(Scheduler *sch, SchedulerNode node)
+{
+switch (node.type) {
+case SCH_NODE_TYPE_DEMUX:   return demux_done (sch, node.idx);
+case SCH_NODE_TYPE_MUX: return mux_done   (sch, node.idx);
+case SCH_NODE_TYPE_DEC: return dec_done   (sch, node.idx);
+case SCH_NODE_TYPE_ENC: return enc_done   (sch, node.idx);
+case SCH_NODE_TYPE_FILTER_IN:   return filter_done(sch, node.idx);
+default: av_assert0(0);
+}
+}
+
 static void *task_wrapper(void *arg)
 {
 SchTask  *task = arg;
@@ -2424,15 +2447,7 @@ static void *task_wrapper(void *arg)
 av_log(task->func_arg, AV_LOG_ERROR,
"Task finished with error code: %d (%s)\n", ret, 
av_err2str(ret));
 
-switch (task->node.type) {
-case SCH_NODE_TYPE_DEMUX:   err = demux_done (sch, task->node.idx); 
break;
-case SCH_NODE_TYPE_MUX: err = mux_done   (sch, task->node.idx); 
break;
-case SCH_NODE_TYPE_DEC: err = dec_done   (sch, task->node.idx); 
break;
-case SCH_NODE_TYPE_ENC: err = enc_done   (sch, task->node.idx); 
break;
-case SCH_NODE_TYPE_FILTER_IN:   err = filter_done(sch, task->node.idx); 
break;
-default: av_assert0(0);
-}
-
+err = task_cleanup(sch, task->node);
 ret = err_merge(ret, err);
 
 // EOF is considered normal termination
@@ -2448,13 +2463,13 @@ static void *task_w