This is an automated email from the git hooks/post-receive script.

Git pushed a commit to branch master
in repository ffmpeg.

commit 03dfac563018e6e8b81e331ebae0732d8edfe754
Author:     Niklas Haas <[email protected]>
AuthorDate: Fri Dec 19 15:17:53 2025 +0100
Commit:     Niklas Haas <[email protected]>
CommitDate: Sat May 23 08:41:12 2026 +0000

    fftools/ffmpeg_sched: allow throttling decoder outputs
    
    This is a departure from the conventional idea of decoders always outputting
    data as fast as possible. Instead, this allows decoders to be throttled in 
the
    same way filter graphs can be.
    
    This comes into play when e.g. a demuxer is feeding into two decoders, but
    only one of the two decoders is actually currently needed (e.g. due to
    A/V misalignment). In that case, what typically happens is that the unneeded
    decoder alse decodes all frames, and then piles them up on the "buffersrc"
    filter's downstream link (growing indefinitely).
    
    Another issue this solves manifests when e.g. a single demuxer is feeding 
many
    decoders that all try to feed frames to the same filter graph. In this case,
    all decoders run as fast as posssible, leading to lock contention on the
    filter graph input queue; resulting in (again) many frames piling up on the
    buffersrc (or downstream filters) for the unneeded inputs that are not 
actually
    the bottleneck, while the input that's actually undersatisfied can end up
    starved for CPU time, possibly for long enough to exhaust memory limits. The
    normal rate limiting fails to apply in this scenario because all decoders 
share
    a single demuxer, and are hence rate-limited only by the demuxer speed; 
whereas
    the demuxer is not choked because from the PoV of the scheduler, the filter
    graph is simply not getting enough frames.
    
    In a more general sense, there's a philosophical argument to be made here.
    Since a decoder is typically also a decompressor, it produces more data than
    it consumes. So, it a sense, it's acting like a type of producer also - in
    the same way that a filter graph can produce more input that outputs.
    
    Solve all of these issues by allowing decoders to be output-choked, which
    gives the scheduler control over when decoders are allowed to output frames.
    This does mean we have to add some sort of internal packet queue, because 
the
    decoder thread may need to continue *accepting* upstream packets from the
    demuxer (or else we risk stalling the demuxer), but defer the actual 
decoding
    by placing them inside an internal "overflow" queue.
    
    This effectively simulates a sort of "filter graph"-type semantics but
    for the decoder queue.
    
    This overflow logic is fairly self-contained inside `sch_dec_receive`, 
though
    it is quite nontrivial. I have added as much documentation as is hopefully
    needed to understand the logic.
    
    Importantly, we cannot simply unlimit the decoder input thread queue because
    the demuxer relies on backpressure from the decoder to rate limit itself. 
(Note
    that demuxers may only be active if there is at least one downstream decoder
    that is alse active, so we always have at least one decoder providing
    backpressure)
    
    Sponsored-by: nxtedition AB
    Signed-off-by: Niklas Haas <[email protected]>
---
 fftools/ffmpeg_sched.c | 70 +++++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 63 insertions(+), 7 deletions(-)

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index dddd1d2c67..8ec3bdaf80 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -32,6 +32,7 @@
 #include "libavcodec/packet.h"
 
 #include "libavutil/avassert.h"
+#include "libavutil/container_fifo.h"
 #include "libavutil/error.h"
 #include "libavutil/fifo.h"
 #include "libavutil/frame.h"
@@ -86,6 +87,7 @@ typedef struct SchDec {
     unsigned         nb_outputs;
 
     SchTask             task;
+    SchWaiter           waiter;
     // Queue for receiving input packets, one stream.
     ThreadQueue        *queue;
 
@@ -95,6 +97,9 @@ typedef struct SchDec {
 
     // temporary storage used by sch_dec_send()
     AVFrame            *send_frame;
+
+    // internal queue of undecoded packets used by sch_dec_receive()
+    AVContainerFifo    *overflow;
 } SchDec;
 
 typedef struct SchSyncQueue {
@@ -548,6 +553,7 @@ void sch_free(Scheduler **psch)
         tq_free(&dec->queue);
 
         av_thread_message_queue_free(&dec->queue_end_ts);
+        av_container_fifo_free(&dec->overflow);
 
         for (unsigned j = 0; j < dec->nb_outputs; j++) {
             SchDecOutput *o = &dec->outputs[j];
@@ -559,6 +565,8 @@ void sch_free(Scheduler **psch)
         av_freep(&dec->outputs);
 
         av_frame_free(&dec->send_frame);
+
+        waiter_uninit(&dec->waiter);
     }
     av_freep(&sch->dec);
 
@@ -809,6 +817,14 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void 
*ctx, int send_end_ts)
             return ret;
     }
 
+    dec->overflow = av_container_fifo_alloc_avpacket(0);
+    if (!dec->overflow)
+        return AVERROR(ENOMEM);
+
+    ret = waiter_init(&dec->waiter);
+    if (ret < 0)
+        return ret;
+
     return idx;
 }
 
@@ -1305,8 +1321,9 @@ int sch_mux_sub_heartbeat_add(Scheduler *sch, unsigned 
mux_idx, unsigned stream_
 enum {
     UNCHOKE_DEMUX  = (1 << 0),
     UNCHOKE_FILTER = (1 << 1),
+    UNCHOKE_DECODE = (1 << 2),
 
-    UNCHOKE_ALL = UNCHOKE_DEMUX | UNCHOKE_FILTER,
+    UNCHOKE_ALL = UNCHOKE_DEMUX | UNCHOKE_FILTER | UNCHOKE_DECODE,
 };
 
 static void unchoke_for_stream(Scheduler *sch, SchedulerNode src, int flags);
@@ -1321,8 +1338,10 @@ static void unchoke_downstream(Scheduler *sch, 
SchedulerNode *dst)
     switch (dst->type) {
     case SCH_NODE_TYPE_DEC:
         dec = &sch->dec[dst->idx];
-        for (int i = 0; i < dec->nb_outputs; i++)
-            unchoke_downstream(sch, dec->outputs[i].dst);
+        if (!dec->waiter.choked_next) {
+            for (int i = 0; i < dec->nb_outputs; i++)
+                unchoke_downstream(sch, dec->outputs[i].dst);
+        }
         break;
     case SCH_NODE_TYPE_ENC:
         enc = &sch->enc[dst->idx];
@@ -1353,6 +1372,7 @@ static void unchoke_for_stream(Scheduler *sch, 
SchedulerNode src, int flags)
     while (1) {
         SchFilterGraph *fg;
         SchDemux *demux;
+        SchDec *dec;
         switch (src.type) {
         case SCH_NODE_TYPE_DEMUX:
             // fed directly by a demuxer (i.e. not through a filtergraph)
@@ -1366,7 +1386,11 @@ static void unchoke_for_stream(Scheduler *sch, 
SchedulerNode src, int flags)
             }
             return;
         case SCH_NODE_TYPE_DEC:
-            src = sch->dec[src.idx].src;
+            dec = &sch->dec[src.idx];
+            if (!(flags & UNCHOKE_DECODE))
+                return;
+            dec->waiter.choked_next = 0;
+            src = dec->src;
             continue;
         case SCH_NODE_TYPE_ENC:
             src = sch->enc[src.idx].src;
@@ -1445,6 +1469,7 @@ static void schedule_update_locked(Scheduler *sch)
 
     RESET_WAITER(demux);
     RESET_WAITER(filters);
+    RESET_WAITER(dec);
 
     // figure out the sources that are allowed to proceed
     for (unsigned i = 0; i < sch->nb_mux; i++) {
@@ -1455,8 +1480,11 @@ static void schedule_update_locked(Scheduler *sch)
 
             // unblock sources for output streams that are not finished
             // and not too far ahead of the trailing stream
-            if (ms->source_finished)
+            if (ms->source_finished) {
+                // still allow decoders to drain
+                unchoke_for_stream(sch, ms->src, UNCHOKE_DECODE);
                 continue;
+            }
             if (dts == AV_NOPTS_VALUE && ms->last_dts != AV_NOPTS_VALUE)
                 continue;
             if (dts != AV_NOPTS_VALUE && ms->last_dts - dts >= 
SCHEDULE_TOLERANCE)
@@ -1510,6 +1538,7 @@ static void schedule_update_locked(Scheduler *sch)
 
     UPDATE_WAITER(demux);
     UPDATE_WAITER(filters);
+    UPDATE_WAITER(dec);
 }
 
 enum {
@@ -2312,6 +2341,12 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, 
AVPacket *pkt)
     av_assert0(dec_idx < sch->nb_dec);
     dec = &sch->dec[dec_idx];
 
+retry:
+    // Pull a packet from the overflow FIFO while unchoked or expecting EOF ts
+    if (av_container_fifo_can_read(dec->overflow) &&
+        (!atomic_load(&dec->waiter.choked) || dec->expect_end_ts))
+        return av_container_fifo_read(dec->overflow, pkt, 0);
+
     // the decoder should have given us post-flush end timestamp in pkt
     if (dec->expect_end_ts) {
         Timestamp ts = (Timestamp){ .ts = pkt->pts, .tb = pkt->time_base };
@@ -2325,11 +2360,29 @@ int sch_dec_receive(Scheduler *sch, unsigned dec_idx, 
AVPacket *pkt)
     ret = tq_receive(dec->queue, &dummy, pkt, 0);
     av_assert0(dummy <= 0);
 
+    // drain packets from overflow queue before returning EOF
+    if (ret == AVERROR_EOF && av_container_fifo_can_read(dec->overflow)) {
+        int terminate = waiter_wait(sch, &dec->waiter);
+        if (terminate)
+            return ret;
+        return av_container_fifo_read(dec->overflow, pkt, 0);
+    } else if (ret < 0)
+        return ret;
+
     // got a flush packet, on the next call to this function the decoder
-    // will give us post-flush end timestamp
-    if (ret >= 0 && !pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
+    // should give us post-flush end timestamp (after draining overflow fifo)
+    if (!pkt->data && !pkt->side_data_elems && dec->queue_end_ts)
         dec->expect_end_ts = 1;
 
+    // we got a packet, but we're currently choked or have existing overflow
+    // packets; so push it to the FIFO first
+    if (atomic_load(&dec->waiter.choked) || 
av_container_fifo_can_read(dec->overflow)) {
+        ret = av_container_fifo_write(dec->overflow, pkt, 0);
+        if (ret < 0)
+            return ret;
+        goto retry;
+    }
+
     return ret;
 }
 
@@ -2785,6 +2838,9 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
                 choke_demux(sch, i, 0); // unfreeze to allow draining
         }
 
+    for (unsigned i = 0; i < sch->nb_dec; i++)
+        waiter_set(&sch->dec[i].waiter, 0); // unfreeze to allow draining
+
     pthread_mutex_unlock(&sch->schedule_lock);
 
     for (unsigned i = 0; i < sch->nb_demux; i++) {

_______________________________________________
ffmpeg-cvslog mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to