Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1580464018


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,26 +97,38 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
-
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
-        this.stateRequestsBuffer = new StateRequestBuffer<>();
+
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.stateRequestsBuffer =
+                new StateRequestBuffer<>(
+                        bufferTimeout,
+                        (scheduledTriggerSeq) ->
+                                mailboxExecutor.execute(
+                                        () -> {
+                                            if 
(stateRequestsBuffer.currentTriggerSeq.get()

Review Comment:
   How about providing a function `boolean 
stateRequestsBuffer#checkCurrentSeq(long seq)` ?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##########
@@ -53,17 +73,74 @@ public class StateRequestBuffer<K> {
     /** The number of state requests in blocking queue. */
     int blockingQueueSize;
 
-    public StateRequestBuffer() {
+    /** The timeout of {@link #activeQueue} triggering in milliseconds. */
+    final long bufferTimeout;
+
+    /** The handler to trigger when timeout. */
+    final Consumer<Long> timeoutHandler;
+
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    /**
+     * The current scheduled future, when the next scheduling occurs, the 
previous one that has not
+     * yet been executed will be canceled.
+     */
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current scheduled trigger sequence number, a timeout trigger is 
scheduled only if {@code
+     * scheduledTriggerSeq} is less than {@code currentTriggerSeq}.
+     */
+    AtomicLong scheduledTriggerSeq;
+
+    /**
+     * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by 1.
+     */
+    AtomicLong currentTriggerSeq;
+
+    public StateRequestBuffer(long bufferTimeout, Consumer<Long> 
timeoutHandler) {
         this.activeQueue = new LinkedList<>();
         this.blockingQueue = new HashMap<>();
         this.blockingQueueSize = 0;
+        this.bufferTimeout = bufferTimeout;
+        this.timeoutHandler = timeoutHandler;
+        this.scheduledTriggerSeq = new AtomicLong(-1);
+        this.currentTriggerSeq = new AtomicLong(0);
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor = DELAYER;
+        }
+    }
+
+    void advanceTriggerSeq() {
+        currentTriggerSeq.incrementAndGet();
     }
 
     void enqueueToActive(StateRequest<K, ?, ?> request) {
         if (request.getRequestType() == StateRequestType.SYNC_POINT) {
             request.getFuture().complete(null);
         } else {
             activeQueue.add(request);
+            if (bufferTimeout > 0 && currentTriggerSeq.get() > 
scheduledTriggerSeq.get()) {
+                if (currentScheduledFuture != null
+                        && !currentScheduledFuture.isDone()
+                        && !currentScheduledFuture.isCancelled()) {
+                    currentScheduledFuture.cancel(false);
+                }
+                scheduledTriggerSeq.set(currentTriggerSeq.get());

Review Comment:
   I was thinking add `final long thisScheduledSeq = 
scheduledTriggerSeq.get();` here, and use `thisScheduledSeq == 
currentTriggerSeq.get()` condition at line 136.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java:
##########
@@ -53,17 +73,74 @@ public class StateRequestBuffer<K> {
     /** The number of state requests in blocking queue. */
     int blockingQueueSize;
 
-    public StateRequestBuffer() {
+    /** The timeout of {@link #activeQueue} triggering in milliseconds. */
+    final long bufferTimeout;
+
+    /** The handler to trigger when timeout. */
+    final Consumer<Long> timeoutHandler;
+
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    /**
+     * The current scheduled future, when the next scheduling occurs, the 
previous one that has not
+     * yet been executed will be canceled.
+     */
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current scheduled trigger sequence number, a timeout trigger is 
scheduled only if {@code
+     * scheduledTriggerSeq} is less than {@code currentTriggerSeq}.
+     */
+    AtomicLong scheduledTriggerSeq;
+
+    /**
+     * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by 1.
+     */
+    AtomicLong currentTriggerSeq;

Review Comment:
   how about naming this `currentSeq`, since the batch is distinguished by a 
seq.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to