Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1579142292


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,26 +97,39 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
-
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
-        this.stateRequestsBuffer = new StateRequestBuffer<>();
+
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.stateRequestsBuffer =
+                new StateRequestBuffer<>(
+                        bufferTimeout,
+                        () ->
+                                mailboxExecutor.execute(
+                                        () -> {
+                                            if 
(stateRequestsBuffer.currentTriggerSeq.get()
+                                                    == 
stateRequestsBuffer.scheduledTriggerSeq

Review Comment:
   This condition is always true.  I suggest you pass the `triggered seq` into 
the handler lambda as parameter.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##########
@@ -53,17 +72,75 @@ public class StateRequestBuffer<K> {
     /** The number of state requests in blocking queue. */
     int blockingQueueSize;
 
-    public StateRequestBuffer() {
+    /** The timeout of {@link #activeQueue} triggering in milliseconds. */
+    final long bufferTimeout;
+
+    /** The handler to trigger when {@link #activeQueue} size is 1. */
+    final Runnable timeoutHandler;
+
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    /**
+     * The current scheduled future, when the next scheduling occurs, the 
previous one that has not
+     * yet been executed will be canceled.
+     */
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current scheduled trigger sequence number, a timeout trigger is 
scheduled only if {@code
+     * scheduledTriggerSeq} is less than {@code currentTriggerSeq}.
+     */
+    AtomicLong scheduledTriggerSeq;
+
+    /**
+     * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by one.
+     */
+    AtomicLong currentTriggerSeq;
+
+    public StateRequestBuffer(long bufferTimeout, Runnable timeoutHandler) {
         this.activeQueue = new LinkedList<>();
         this.blockingQueue = new HashMap<>();
         this.blockingQueueSize = 0;
+        this.bufferTimeout = bufferTimeout;
+        this.timeoutHandler = timeoutHandler;
+        this.scheduledTriggerSeq = new AtomicLong(-1);
+        this.currentTriggerSeq = new AtomicLong(0);
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor = DELAYER;
+        }
+    }
+
+    void advanceTriggerSeq() {
+        currentTriggerSeq.incrementAndGet();
     }
 
     void enqueueToActive(StateRequest<K, ?, ?> request) {
         if (request.getRequestType() == StateRequestType.SYNC_POINT) {
             request.getFuture().complete(null);
         } else {
             activeQueue.add(request);
+            // if the active queue size is 1, it means that the current 
request is the oldest one in

Review Comment:
   nit. remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to