yunfengzhou-hub commented on code in PR #24667:
URL: https://github.com/apache/flink/pull/24667#discussion_r1575603596


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+     * perform actively when the next state request arrives even if the 
activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
     }
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeOut,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeOut = bufferTimeOut;
         this.maxInFlightRecordNum = maxInFlightRecords;
         this.stateRequestsBuffer = new StateRequestBuffer<>();
         this.inFlightRecordNum = new AtomicInteger(0);
+        this.timeoutFlag = new AtomicBoolean(false);
+
+        // ----------------- initialize buffer timeout -------------------
+        this.currentScheduledFuture = null;
+        if (bufferTimeOut > 0) {
+            this.scheduledExecutor =
+                    new ScheduledThreadPoolExecutor(
+                            1,
+                            new ThreadFactory() {
+                                @Override
+                                public Thread newThread(Runnable r) {
+                                    return new Thread(r, "AEC-scheduler");
+                                }
+                            });
+            this.scheduledExecutor.setRemoveOnCancelPolicy(true);
+
+            // make sure shutdown removes all pending tasks
+            
this.scheduledExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+            
this.scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+        } else {
+            this.scheduledExecutor = null;
+        }
+
         LOG.info(
                 "Create AsyncExecutionController: batchSize {}, 
maxInFlightRecordsNum {}",
                 batchSize,
                 maxInFlightRecords);
     }
 
+    void scheduleTimeout() {
+        if (bufferTimeOut > 0) {
+            if (currentScheduledFuture != null
+                    && !currentScheduledFuture.isDone()
+                    && !currentScheduledFuture.isCancelled()) {
+                currentScheduledFuture.cancel(false);
+            }
+            currentScheduledFuture =
+                    (ScheduledFuture<Void>)
+                            scheduledExecutor.schedule(
+                                    () -> {
+                                        timeoutFlag.set(true);
+                                        mailboxExecutor.execute(
+                                                () -> triggerIfNeeded(false), 
"AEC-timeout");

Review Comment:
   It might be simpler to remove `timeoutFlag` and invoke 
`triggerIfNeeded(true)` directly.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -51,15 +56,24 @@ public class AsyncExecutionController<K> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AsyncExecutionController.class);
 
-    public static final int DEFAULT_BATCH_SIZE = 1000;
-    public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;
+    private static final int DEFAULT_BATCH_SIZE = 1000;
+
+    private static final int DEFAULT_BUFFER_TIMEOUT = 1000;

Review Comment:
   How about remove the constants and use 
`ASYNC_STATE_BUFFER_SIZE.defaultValue()` from the `ExecutionConfig`? This can 
help avoid maintaining the same value in two places.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+     * perform actively when the next state request arrives even if the 
activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;
+
+    ScheduledFuture<Void> currentScheduledFuture;
+
     public AsyncExecutionController(MailboxExecutor mailboxExecutor, 
StateExecutor stateExecutor) {
-        this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, 
DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
+        this(
+                mailboxExecutor,
+                stateExecutor,
+                DEFAULT_BATCH_SIZE,
+                DEFAULT_BUFFER_TIMEOUT,
+                DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
     }
 
     public AsyncExecutionController(
             MailboxExecutor mailboxExecutor,
             StateExecutor stateExecutor,
             int batchSize,
+            long bufferTimeOut,
             int maxInFlightRecords) {
         this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords);
         this.mailboxExecutor = mailboxExecutor;
         this.stateFutureFactory = new StateFutureFactory<>(this, 
mailboxExecutor);
         this.stateExecutor = stateExecutor;
         this.batchSize = batchSize;
+        this.bufferTimeOut = bufferTimeOut;

Review Comment:
   The "O` in "BufferTimeOut` is upper-case while the "o" in timeoutFlag is 
lower-case. It might be better to get them unified to the same convention.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java:
##########
@@ -346,6 +337,63 @@ public void testSyncPoint() {
         recordContext2.release();
     }
 
+    @Test
+    void testBufferTimeout() throws InterruptedException {
+        batchSize = 5;
+        timeout = 1000;
+        setup();
+        Runnable userCode = () -> valueState.asyncValue();
+
+        // ------------ basic timeout -------------------
+        for (int i = 0; i < batchSize - 1; i++) {
+            String record = String.format("key%d-r%d", i, i);
+            String key = String.format("key%d", batchSize + i);
+            RecordContext<String> recordContext = aec.buildContext(record, 
key);
+            aec.setCurrentContext(recordContext);
+            userCode.run();
+        }
+        assertThat(aec.timeoutFlag.get()).isFalse();
+        assertThat(aec.currentScheduledFuture.isDone()).isFalse();
+        assertThat(aec.inFlightRecordNum.get()).isEqualTo(batchSize - 1);
+        
assertThat(aec.stateRequestsBuffer.activeQueueSize()).isEqualTo(batchSize - 1);
+        assertThat(aec.stateRequestsBuffer.blockingQueueSize()).isEqualTo(0);
+
+        Thread.sleep(timeout + 100);

Review Comment:
   It might be better to avoid using `Thread.sleep` in test cases, as it may 
improve the duration of CI and behave as flaky tests. How about introduce a 
`TestScheduledThreadPoolExecutor` so we can control when each step is 
triggered? An example of this is 
`org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java:
##########
@@ -94,29 +108,91 @@ public class AsyncExecutionController<K> {
      */
     final AtomicInteger inFlightRecordNum;
 
+    /**
+     * The flag to indicate whether the {@link #bufferTimeOut} is reached, if 
yes, a trigger will
+     * perform actively when the next state request arrives even if the 
activeQueue has not reached
+     * the {@link #batchSize}.
+     */
+    final AtomicBoolean timeoutFlag;
+
+    /** The executor service that schedules and calls the triggers of this 
task. */
+    final ScheduledThreadPoolExecutor scheduledExecutor;

Review Comment:
   Would it be better to reuse existing utility methods like 
`FutureUtils.delay()`? This way AEC won't need to maintain such resources.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to