Zakelly commented on code in PR #24667: URL: https://github.com/apache/flink/pull/24667#discussion_r1579142292
########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java: ########## @@ -94,26 +97,39 @@ public class AsyncExecutionController<K> { */ final AtomicInteger inFlightRecordNum; - public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { - this(mailboxExecutor, stateExecutor, DEFAULT_BATCH_SIZE, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); - } - public AsyncExecutionController( MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int batchSize, + long bufferTimeout, int maxInFlightRecords) { this.keyAccountingUnit = new KeyAccountingUnit<>(maxInFlightRecords); this.mailboxExecutor = mailboxExecutor; this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); this.stateExecutor = stateExecutor; this.batchSize = batchSize; + this.bufferTimeout = bufferTimeout; this.maxInFlightRecordNum = maxInFlightRecords; - this.stateRequestsBuffer = new StateRequestBuffer<>(); + this.inFlightRecordNum = new AtomicInteger(0); + this.stateRequestsBuffer = + new StateRequestBuffer<>( + bufferTimeout, + () -> + mailboxExecutor.execute( + () -> { + if (stateRequestsBuffer.currentTriggerSeq.get() + == stateRequestsBuffer.scheduledTriggerSeq Review Comment: This condition is always true. I suggest you pass the `triggered seq` into the handler lambda as parameter. ########## flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequestBuffer.java: ########## @@ -53,17 +72,75 @@ public class StateRequestBuffer<K> { /** The number of state requests in blocking queue. */ int blockingQueueSize; - public StateRequestBuffer() { + /** The timeout of {@link #activeQueue} triggering in milliseconds. */ + final long bufferTimeout; + + /** The handler to trigger when {@link #activeQueue} size is 1. */ + final Runnable timeoutHandler; + + /** The executor service that schedules and calls the triggers of this task. */ + ScheduledExecutorService scheduledExecutor; + + /** + * The current scheduled future, when the next scheduling occurs, the previous one that has not + * yet been executed will be canceled. + */ + ScheduledFuture<Void> currentScheduledFuture; + + /** + * The current scheduled trigger sequence number, a timeout trigger is scheduled only if {@code + * scheduledTriggerSeq} is less than {@code currentTriggerSeq}. + */ + AtomicLong scheduledTriggerSeq; + + /** + * The current trigger sequence number, used to distinguish different triggers. Every time a + * trigger occurs, {@code currentTriggerSeq} increases by one. + */ + AtomicLong currentTriggerSeq; + + public StateRequestBuffer(long bufferTimeout, Runnable timeoutHandler) { this.activeQueue = new LinkedList<>(); this.blockingQueue = new HashMap<>(); this.blockingQueueSize = 0; + this.bufferTimeout = bufferTimeout; + this.timeoutHandler = timeoutHandler; + this.scheduledTriggerSeq = new AtomicLong(-1); + this.currentTriggerSeq = new AtomicLong(0); + if (bufferTimeout > 0) { + this.scheduledExecutor = DELAYER; + } + } + + void advanceTriggerSeq() { + currentTriggerSeq.incrementAndGet(); } void enqueueToActive(StateRequest<K, ?, ?> request) { if (request.getRequestType() == StateRequestType.SYNC_POINT) { request.getFuture().complete(null); } else { activeQueue.add(request); + // if the active queue size is 1, it means that the current request is the oldest one in Review Comment: nit. remove this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org