From 89f06098e388b4cfccb4c38badd57670cc66342f Mon Sep 17 00:00:00 2001
From: Nitin Kamboj <nkamboj@amd.com>
Date: Fri, 2 May 2025 12:10:16 +0530
Subject: [PATCH] Make multiple queue changes

Signed-off-by: Nitin Kamboj <nkamboj@amd.com>
---
 src/fftools/ffmpeg_sched.c | 56 ++++++++++++++++++++++----------------
 1 file changed, 32 insertions(+), 24 deletions(-)

diff --git a/src/fftools/ffmpeg_sched.c b/src/fftools/ffmpeg_sched.c
index 4217420e4c..cd75cae76d 100644
--- a/src/fftools/ffmpeg_sched.c
+++ b/src/fftools/ffmpeg_sched.c
@@ -258,7 +258,7 @@
      SchTask             task;
      // input queue, nb_inputs+1 streams
      // last stream is control
-     ThreadQueue        *queue;
+     ThreadQueue        **queues;
      SchWaiter           waiter;
  
      // protected by schedule_lock
@@ -555,8 +555,10 @@
  
      for (unsigned i = 0; i < sch->nb_filters; i++) {
          SchFilterGraph *fg = &sch->filters[i];
- 
-         tq_free(&fg->queue);
+
+         for(unsigned i = 0; i < fg->nb_inputs + 1; i++)
+            tq_free(&fg->queues[i]);
+        av_freep(&fg->queues);
  
          av_freep(&fg->inputs);
          av_freep(&fg->outputs);
@@ -860,10 +862,17 @@
      ret = waiter_init(&fg->waiter);
      if (ret < 0)
          return ret;
- 
-     ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
-     if (ret < 0)
-         return ret;
+
+     fg->queues = av_calloc(fg->nb_inputs + 1, sizeof(*fg->queues));
+    for(unsigned i = 0; i < fg->nb_inputs + 1; i++) {
+        ret = queue_alloc(&fg->queues[i], 1, 0, QUEUE_FRAMES);
+        if (ret < 0){
+            for(unsigned j = 0; j < i; j++)
+                tq_free(&fg->queues[j]);
+            av_freep(&fg->queues);
+            return ret;
+        }
+    }
  
      return idx;
  }
@@ -2167,15 +2176,15 @@
                            unsigned in_idx, AVFrame *frame)
  {
      if (frame)
-         return tq_send(fg->queue, in_idx, frame);
+         return tq_send(fg->queues[in_idx], 0, frame);
  
      if (!fg->inputs[in_idx].send_finished) {
          fg->inputs[in_idx].send_finished = 1;
-         tq_send_finish(fg->queue, in_idx);
+         tq_send_finish(fg->queues[in_idx], 0);
  
          // close the control stream when all actual inputs are done
          if (atomic_fetch_add(&fg->nb_inputs_finished_send, 1) == fg->nb_inputs - 1)
-             tq_send_finish(fg->queue, fg->nb_inputs);
+             tq_send_finish(fg->queues[fg->nb_inputs], 0);
      }
      return 0;
  }
@@ -2401,16 +2410,14 @@
      }
  
      while (1) {
-         int ret, idx;
- 
-         ret = tq_receive(fg->queue, &idx, frame);
-         if (idx < 0)
-             return AVERROR_EOF;
-         else if (ret >= 0) {
-             *in_idx = idx;
-             return 0;
-         }
- 
+        int ret, tmp;
+        ret = tq_receive(fg->queues[*in_idx], &tmp, frame);
+        if(tmp < 0)
+              return AVERROR_EOF;
+        else if (ret >= 0) {
+            return 0;
+        }
+
          // disregard EOFs for specific streams - they should always be
          // preceded by an EOF frame
      }
@@ -2429,11 +2436,11 @@
  
      if (!fi->receive_finished) {
          fi->receive_finished = 1;
-         tq_receive_finish(fg->queue, in_idx);
+         tq_receive_finish(fg->queues[in_idx], 0);
  
          // close the control stream when all actual inputs are done
          if (++fg->nb_inputs_finished_receive == fg->nb_inputs)
-             tq_receive_finish(fg->queue, fg->nb_inputs);
+             tq_receive_finish(fg->queues[fg->nb_inputs], 0);
      }
  }
  
@@ -2459,7 +2466,8 @@
      int ret = 0;
  
      for (unsigned i = 0; i <= fg->nb_inputs; i++)
-         tq_receive_finish(fg->queue, i);
+         tq_receive_finish(fg->queues[i], 0);
+
  
      for (unsigned i = 0; i < fg->nb_outputs; i++) {
          SchedulerNode dst = fg->outputs[i].dst;
@@ -2604,5 +2612,5 @@
      sch->state = SCH_STATE_STOPPED;
  
      return ret;
+
  }
- 
\ No newline at end of file
-- 
2.34.1

