This is an automated email from the git hooks/post-receive script. Git pushed a commit to branch master in repository ffmpeg.
commit 03dfac563018e6e8b81e331ebae0732d8edfe754 Author: Niklas Haas <[email protected]> AuthorDate: Fri Dec 19 15:17:53 2025 +0100 Commit: Niklas Haas <[email protected]> CommitDate: Sat May 23 08:41:12 2026 +0000 fftools/ffmpeg_sched: allow throttling decoder outputs This is a departure from the conventional idea of decoders always outputting data as fast as possible. Instead, this allows decoders to be throttled in the same way filter graphs can be. This comes into play when e.g. a demuxer is feeding into two decoders, but only one of the two decoders is actually currently needed (e.g. due to A/V misalignment). In that case, what typically happens is that the unneeded decoder alse decodes all frames, and then piles them up on the "buffersrc" filter's downstream link (growing indefinitely). Another issue this solves manifests when e.g. a single demuxer is feeding many decoders that all try to feed frames to the same filter graph. In this case, all decoders run as fast as posssible, leading to lock contention on the filter graph input queue; resulting in (again) many frames piling up on the buffersrc (or downstream filters) for the unneeded inputs that are not actually the bottleneck, while the input that's actually undersatisfied can end up starved for CPU time, possibly for long enough to exhaust memory limits. The normal rate limiting fails to apply in this scenario because all decoders share a single demuxer, and are hence rate-limited only by the demuxer speed; whereas the demuxer is not choked because from the PoV of the scheduler, the filter graph is simply not getting enough frames. In a more general sense, there's a philosophical argument to be made here. Since a decoder is typically also a decompressor, it produces more data than it consumes. So, it a sense, it's acting like a type of producer also - in the same way that a filter graph can produce more input that outputs. Solve all of these issues by allowing decoders to be output-choked, which gives the scheduler control over when decoders are allowed to output frames. This does mean we have to add some sort of internal packet queue, because the decoder thread may need to continue *accepting* upstream packets from the demuxer (or else we risk stalling the demuxer), but defer the actual decoding by placing them inside an internal "overflow" queue. This effectively simulates a sort of "filter graph"-type semantics but for the decoder queue. This overflow logic is fairly self-contained inside `sch_dec_receive`, though it is quite nontrivial. I have added as much documentation as is hopefully needed to understand the logic. Importantly, we cannot simply unlimit the decoder input thread queue because the demuxer relies on backpressure from the decoder to rate limit itself. (Note that demuxers may only be active if there is at least one downstream decoder that is alse active, so we always have at least one decoder providing backpressure) Sponsored-by: nxtedition AB Signed-off-by: Niklas Haas <[email protected]> --- fftools/ffmpeg_sched.c | 70 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 63 insertions(+), 7 deletions(-) diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c index dddd1d2c67..8ec3bdaf80 100644 --- a/fftools/ffmpeg_sched.c +++ b/fftools/ffmpeg_sched.c @@ -32,6 +32,7 @@ #include "libavcodec/packet.h" #include "libavutil/avassert.h" +#include "libavutil/container_fifo.h" #include "libavutil/error.h" #include "libavutil/fifo.h" #include "libavutil/frame.h" @@ -86,6 +87,7 @@ typedef struct SchDec { unsigned nb_outputs; SchTask task; + SchWaiter waiter; // Queue for receiving input packets, one stream. ThreadQueue *queue; @@ -95,6 +97,9 @@ typedef struct SchDec { // temporary storage used by sch_dec_send() AVFrame *send_frame; + + // internal queue of undecoded packets used by sch_dec_receive() + AVContainerFifo *overflow; } SchDec; typedef struct SchSyncQueue { @@ -548,6 +553,7 @@ void sch_free(Scheduler **psch) tq_free(&dec->queue); av_thread_message_queue_free(&dec->queue_end_ts); + av_container_fifo_free(&dec->overflow); for (unsigned j = 0; j < dec->nb_outputs; j++) { SchDecOutput *o = &dec->outputs[j]; @@ -559,6 +565,8 @@ void sch_free(Scheduler **psch) av_freep(&dec->outputs); av_frame_free(&dec->send_frame); + + waiter_uninit(&dec->waiter); } av_freep(&sch->dec); @@ -809,6 +817,14 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx, int send_end_ts) return ret; } + dec->overflow = av_container_fifo_alloc_avpacket(0); + if (!dec->overflow) + return AVERROR(ENOMEM); + + ret = waiter_init(&dec->waiter); + if (ret < 0) + return ret; + return idx; } @@ -1305,8 +1321,9 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned mux_idx, unsigned stream_ enum { UNCHOKE_DEMUX = (1 << 0), UNCHOKE_FILTER = (1 << 1), + UNCHOKE_DECODE = (1 << 2), - UNCHOKE_ALL = UNCHOKE_DEMUX | UNCHOKE_FILTER, + UNCHOKE_ALL = UNCHOKE_DEMUX | UNCHOKE_FILTER | UNCHOKE_DECODE, }; static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags); @@ -1321,8 +1338,10 @@ static void unchoke_downstream(Scheduler *sch, SchedulerNode *dst) switch (dst->type) { case SCH_NODE_TYPE_DEC: dec = &sch->dec[dst->idx]; - for (int i = 0; i < dec->nb_outputs; i++) - unchoke_downstream(sch, dec->outputs[i].dst); + if (!dec->waiter.choked_next) { + for (int i = 0; i < dec->nb_outputs; i++) + unchoke_downstream(sch, dec->outputs[i].dst); + } break; case SCH_NODE_TYPE_ENC: enc = &sch->enc[dst->idx]; @@ -1353,6 +1372,7 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags) while (1) { SchFilterGraph *fg; SchDemux *demux; + SchDec *dec; switch (src.type) { case SCH_NODE_TYPE_DEMUX: // fed directly by a demuxer (i.e. not through a filtergraph) @@ -1366,7 +1386,11 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags) } return; case SCH_NODE_TYPE_DEC: - src = sch->dec[src.idx].src; + dec = &sch->dec[src.idx]; + if (!(flags & UNCHOKE_DECODE)) + return; + dec->waiter.choked_next = 0; + src = dec->src; continue; case SCH_NODE_TYPE_ENC: src = sch->enc[src.idx].src; @@ -1445,6 +1469,7 @@ static void schedule_update_locked(Scheduler *sch) RESET_WAITER(demux); RESET_WAITER(filters); + RESET_WAITER(dec); // figure out the sources that are allowed to proceed for (unsigned i = 0; i < sch->nb_mux; i++) { @@ -1455,8 +1480,11 @@ static void schedule_update_locked(Scheduler *sch) // unblock sources for output streams that are not finished // and not too far ahead of the trailing stream - if (ms->source_finished) + if (ms->source_finished) { + // still allow decoders to drain + unchoke_for_stream(sch, ms->src, UNCHOKE_DECODE); continue; + } if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE) continue; if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= SCHEDULE_TOLERANCE) @@ -1510,6 +1538,7 @@ static void schedule_update_locked(Scheduler *sch) UPDATE_WAITER(demux); UPDATE_WAITER(filters); + UPDATE_WAITER(dec); } enum { @@ -2312,6 +2341,12 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) av_assert0(dec_idx < sch->nb_dec); dec = &sch->dec[dec_idx]; +retry: + // Pull a packet from the overflow FIFO while unchoked or expecting EOF ts + if (av_container_fifo_can_read(dec->overflow) && + (!atomic_load(&dec->waiter.choked) || dec->expect_end_ts)) + return av_container_fifo_read(dec->overflow, pkt, 0); + // the decoder should have given us post-flush end timestamp in pkt if (dec->expect_end_ts) { Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base }; @@ -2325,11 +2360,29 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, AVPacket *pkt) ret = tq_receive(dec->queue, &dummy, pkt, 0); av_assert0(dummy <= 0); + // drain packets from overflow queue before returning EOF + if (ret == AVERROR_EOF && av_container_fifo_can_read(dec->overflow)) { + int terminate = waiter_wait(sch, &dec->waiter); + if (terminate) + return ret; + return av_container_fifo_read(dec->overflow, pkt, 0); + } else if (ret < 0) + return ret; + // got a flush packet, on the next call to this function the decoder - // will give us post-flush end timestamp - if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts) + // should give us post-flush end timestamp (after draining overflow fifo) + if (!pkt->data && !pkt->side_data_elems && dec->queue_end_ts) dec->expect_end_ts = 1; + // we got a packet, but we're currently choked or have existing overflow + // packets; so push it to the FIFO first + if (atomic_load(&dec->waiter.choked) || av_container_fifo_can_read(dec->overflow)) { + ret = av_container_fifo_write(dec->overflow, pkt, 0); + if (ret < 0) + return ret; + goto retry; + } + return ret; } @@ -2785,6 +2838,9 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts) choke_demux(sch, i, 0); // unfreeze to allow draining } + for (unsigned i = 0; i < sch->nb_dec; i++) + waiter_set(&sch->dec[i].waiter, 0); // unfreeze to allow draining + pthread_mutex_unlock(&sch->schedule_lock); for (unsigned i = 0; i < sch->nb_demux; i++) { _______________________________________________ ffmpeg-cvslog mailing list -- [email protected] To unsubscribe send an email to [email protected]
