[FFmpeg-cvslog] fftools/ffmpeg_sched: make sure to always run task cleanup
ffmpeg | branch: release/7.0 | Anton Khirnov | Wed Mar 27 09:49:27 2024 +0100| [536443919f59822670e74d40c5aa1baa0b32fec0] | committer: Anton Khirnov fftools/ffmpeg_sched: make sure to always run task cleanup Even in cases where sch_start() failed. This ensures all links are properly closed and no tasks are left hanging. Fixes #10916. (cherry picked from commit 24b9f29ff2e0b84ae1345f51cbf7240e079d7a2b) Signed-off-by: Anton Khirnov > http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=536443919f59822670e74d40c5aa1baa0b32fec0 --- fftools/ffmpeg_sched.c | 68 -- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 67c32fb5a0..ee3af45908 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -260,6 +260,12 @@ typedef struct SchFilterGraph { int task_exited; } SchFilterGraph; +enum SchedulerState { +SCH_STATE_UNINIT, +SCH_STATE_STARTED, +SCH_STATE_STOPPED, +}; + struct Scheduler { const AVClass *class; @@ -292,7 +298,7 @@ struct Scheduler { char *sdp_filename; int sdp_auto; -int transcode_started; +enum SchedulerState state; atomic_int terminate; atomic_int task_failed; @@ -1144,7 +1150,8 @@ int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) // this may be called during initialization - do not start // threads before sch_start() is called -if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started) +if (++mux->nb_streams_ready == mux->nb_streams && +sch->state >= SCH_STATE_STARTED) ret = mux_init(sch, mux); pthread_mutex_unlock(&sch->mux_ready_lock); @@ -1514,7 +1521,8 @@ int sch_start(Scheduler *sch) if (ret < 0) return ret; -sch->transcode_started = 1; +av_assert0(sch->state == SCH_STATE_UNINIT); +sch->state = SCH_STATE_STARTED; for (unsigned i = 0; i < sch->nb_mux; i++) { SchMux *mux = &sch->mux[i]; @@ -1522,7 +1530,7 @@ int sch_start(Scheduler *sch) if (mux->nb_streams_ready == mux->nb_streams) { ret = mux_init(sch, mux); if (ret < 0) -return ret; +goto fail; } } @@ -1531,7 +1539,7 @@ int sch_start(Scheduler *sch) ret = task_start(&enc->task); if (ret < 0) -return ret; +goto fail; } for (unsigned i = 0; i < sch->nb_filters; i++) { @@ -1539,7 +1547,7 @@ int sch_start(Scheduler *sch) ret = task_start(&fg->task); if (ret < 0) -return ret; +goto fail; } for (unsigned i = 0; i < sch->nb_dec; i++) { @@ -1547,7 +1555,7 @@ int sch_start(Scheduler *sch) ret = task_start(&dec->task); if (ret < 0) -return ret; +goto fail; } for (unsigned i = 0; i < sch->nb_demux; i++) { @@ -1558,7 +1566,7 @@ int sch_start(Scheduler *sch) ret = task_start(&d->task); if (ret < 0) -return ret; +goto fail; } pthread_mutex_lock(&sch->schedule_lock); @@ -1566,6 +1574,9 @@ int sch_start(Scheduler *sch) pthread_mutex_unlock(&sch->schedule_lock); return 0; +fail: +sch_stop(sch, NULL); +return ret; } int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts) @@ -2414,6 +2425,18 @@ int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame) return send_to_filter(sch, fg, fg->nb_inputs, frame); } +static int task_cleanup(Scheduler *sch, SchedulerNode node) +{ +switch (node.type) { +case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx); +case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx); +case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx); +case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx); +case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx); +default: av_assert0(0); +} +} + static void *task_wrapper(void *arg) { SchTask *task = arg; @@ -2426,15 +2449,7 @@ static void *task_wrapper(void *arg) av_log(task->func_arg, AV_LOG_ERROR, "Task finished with error code: %d (%s)\n", ret, av_err2str(ret)); -switch (task->node.type) { -case SCH_NODE_TYPE_DEMUX: err = demux_done (sch, task->node.idx); break; -case SCH_NODE_TYPE_MUX: err = mux_done (sch, task->node.idx); break; -case SCH_NODE_TYPE_DEC: err = dec_done (sch, task->node.idx); break; -case SCH_NODE_TYPE_ENC: err = enc_done (sch, task->node.idx); break; -case SCH_NODE_TYPE_FILTER_IN: err = filter_done(sch, task->node.idx); break; -default: av_assert0(0); -} - +err = task_cleanup(sch, task->node); ret = err_me
[FFmpeg-cvslog] fftools/ffmpeg_sched: make sure to always run task cleanup
ffmpeg | branch: master | Anton Khirnov | Wed Mar 27 09:49:27 2024 +0100| [24b9f29ff2e0b84ae1345f51cbf7240e079d7a2b] | committer: Anton Khirnov fftools/ffmpeg_sched: make sure to always run task cleanup Even in cases where sch_start() failed. This ensures all links are properly closed and no tasks are left hanging. Fixes #10916. > http://git.videolan.org/gitweb.cgi/ffmpeg.git/?a=commit;h=24b9f29ff2e0b84ae1345f51cbf7240e079d7a2b --- fftools/ffmpeg_sched.c | 68 -- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index 422c665c96..f8485db30b 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -260,6 +260,12 @@ typedef struct SchFilterGraph { int task_exited; } SchFilterGraph; +enum SchedulerState { +SCH_STATE_UNINIT, +SCH_STATE_STARTED, +SCH_STATE_STOPPED, +}; + struct Scheduler { const AVClass *class; @@ -292,7 +298,7 @@ struct Scheduler { char *sdp_filename; int sdp_auto; -int transcode_started; +enum SchedulerState state; atomic_int terminate; atomic_int task_failed; @@ -1144,7 +1150,8 @@ int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx) // this may be called during initialization - do not start // threads before sch_start() is called -if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started) +if (++mux->nb_streams_ready == mux->nb_streams && +sch->state >= SCH_STATE_STARTED) ret = mux_init(sch, mux); pthread_mutex_unlock(&sch->mux_ready_lock); @@ -1514,7 +1521,8 @@ int sch_start(Scheduler *sch) if (ret < 0) return ret; -sch->transcode_started = 1; +av_assert0(sch->state == SCH_STATE_UNINIT); +sch->state = SCH_STATE_STARTED; for (unsigned i = 0; i < sch->nb_mux; i++) { SchMux *mux = &sch->mux[i]; @@ -1522,7 +1530,7 @@ int sch_start(Scheduler *sch) if (mux->nb_streams_ready == mux->nb_streams) { ret = mux_init(sch, mux); if (ret < 0) -return ret; +goto fail; } } @@ -1531,7 +1539,7 @@ int sch_start(Scheduler *sch) ret = task_start(&enc->task); if (ret < 0) -return ret; +goto fail; } for (unsigned i = 0; i < sch->nb_filters; i++) { @@ -1539,7 +1547,7 @@ int sch_start(Scheduler *sch) ret = task_start(&fg->task); if (ret < 0) -return ret; +goto fail; } for (unsigned i = 0; i < sch->nb_dec; i++) { @@ -1547,7 +1555,7 @@ int sch_start(Scheduler *sch) ret = task_start(&dec->task); if (ret < 0) -return ret; +goto fail; } for (unsigned i = 0; i < sch->nb_demux; i++) { @@ -1558,7 +1566,7 @@ int sch_start(Scheduler *sch) ret = task_start(&d->task); if (ret < 0) -return ret; +goto fail; } pthread_mutex_lock(&sch->schedule_lock); @@ -1566,6 +1574,9 @@ int sch_start(Scheduler *sch) pthread_mutex_unlock(&sch->schedule_lock); return 0; +fail: +sch_stop(sch, NULL); +return ret; } int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts) @@ -2412,6 +2423,18 @@ int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame) return send_to_filter(sch, fg, fg->nb_inputs, frame); } +static int task_cleanup(Scheduler *sch, SchedulerNode node) +{ +switch (node.type) { +case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx); +case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx); +case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx); +case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx); +case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx); +default: av_assert0(0); +} +} + static void *task_wrapper(void *arg) { SchTask *task = arg; @@ -2424,15 +2447,7 @@ static void *task_wrapper(void *arg) av_log(task->func_arg, AV_LOG_ERROR, "Task finished with error code: %d (%s)\n", ret, av_err2str(ret)); -switch (task->node.type) { -case SCH_NODE_TYPE_DEMUX: err = demux_done (sch, task->node.idx); break; -case SCH_NODE_TYPE_MUX: err = mux_done (sch, task->node.idx); break; -case SCH_NODE_TYPE_DEC: err = dec_done (sch, task->node.idx); break; -case SCH_NODE_TYPE_ENC: err = enc_done (sch, task->node.idx); break; -case SCH_NODE_TYPE_FILTER_IN: err = filter_done(sch, task->node.idx); break; -default: av_assert0(0); -} - +err = task_cleanup(sch, task->node); ret = err_merge(ret, err); // EOF is considered normal termination @@ -2448,13 +2463,13 @@ static void *task_w