Zakelly commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1577589826


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +106,80 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
-    public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
-    }
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    ScheduledExecutorService scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
+    /**
+     * The current trigger sequence number, used to distinguish different 
triggers. Every time a
+     * trigger occurs, {@code currentTriggerSeq} increases by one.
+     */
+    AtomicLong currentTriggerSeq;
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeout,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeout = bufferTimeout;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.currentTriggerSeq = new AtomicLong(0);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeout > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1, new 
ExecutorThreadFactory("AEC-timeout-scheduler"));
+            ((ScheduledThreadPoolExecutor) 
this.scheduledExecutor).setRemoveOnCancelPolicy(true);
+            // make sure shutdown removes all pending tasks
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    
.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            ((ScheduledThreadPoolExecutor) this.scheduledExecutor)
+                    .setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
-                "Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
+                "Create AsyncExecutionController: batchSize {}, bufferTimeout 
{}, maxInFlightRecordsNum {}",
                 batchSize,
+                bufferTimeout,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout(long triggerSeq) {
+        if (bufferTimeout > 0) {
+            if (currentScheduledFuture != null
+                    && !currentScheduledFuture.isDone()
+                    && !currentScheduledFuture.isCancelled()) {
+                currentScheduledFuture.cancel(false);
+            }
+            currentScheduledFuture =
+                    (ScheduledFuture<Void>)
+                            scheduledExecutor.schedule(
+                                    () -> {
+                                        if (triggerSeq != 
currentTriggerSeq.get()) {
+                                            // if any new trigger occurs, skip 
this schedule
+                                            return;
+                                        }
+                                        mailboxExecutor.execute(
+                                                () -> triggerIfNeeded(true), 
"AEC-timeout");

Review Comment:
   how about
   ```
    mailboxExecutor.execute(() -> {if (triggerSeq == currentTriggerSeq.get()) 
{triggerIfNeeded(true);}, "AEC-timeout");
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -220,12 +283,17 @@ <IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, 
OUT> request) {
      * @param force whether to trigger requests in force.
      */
     void triggerIfNeeded(boolean force) {
-        // TODO: introduce a timeout mechanism for triggering.
         if (!force && stateRequestsBuffer.activeQueueSize() < batchSize) {
+            // if the active queue size is 1, it means that the current 
request is the oldest one in
+            // the active queue.
+            if (stateRequestsBuffer.activeQueueSize() == 1) {
+                scheduleTimeout(currentTriggerSeq.get());
+            }

Review Comment:
   Is it possible trigger multiple times for one seq?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to