AHeise commented on a change in pull request #13228: URL: https://github.com/apache/flink/pull/13228#discussion_r493044470
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java ########## @@ -62,9 +65,37 @@ */ public CheckpointedInputGate( InputGate inputGate, - CheckpointBarrierHandler barrierHandler) { + CheckpointBarrierHandler barrierHandler, + MailboxExecutor mailboxExecutor) { this.inputGate = inputGate; this.barrierHandler = barrierHandler; + this.mailboxExecutor = mailboxExecutor; + + waitForPriorityEvents(inputGate, mailboxExecutor); + } + + /** + * Eagerly pulls and processes all priority events. Must be called from task thread. + * + * <p>Basic assumption is that no priority event needs to be handled by the {@link StreamTaskNetworkInput}. + */ + private void processPriorityEvents() throws IOException, InterruptedException { + // check if the priority event is still not processed (could have been pulled before mail was being executed) + if (inputGate.getPriorityEventAvailableFuture().isDone()) { + // process as many priority events as possible + while (pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) { + } + } + Review comment: Good idea, I solved it in the following way: ``` // check if the priority event is still not processed (could have been pulled before mail was being executed) boolean hasPriorityEvent = inputGate.getPriorityEventAvailableFuture().isDone(); while (hasPriorityEvent) { // process as many priority events as possible final Optional<BufferOrEvent> bufferOrEventOpt = pollNext(); bufferOrEventOpt.ifPresent(bufferOrEvent -> checkState(bufferOrEvent.hasPriority(), "Should only poll priority events")); hasPriorityEvent = bufferOrEventOpt.map(BufferOrEvent::morePriorityEvents).orElse(false); } ``` `checkState(!inputGate.getPriorityEventAvailableFuture().isDone())` might be failing if netty receives a new priority event and triggers this available future while the task thread polled the last priority event. This case should happen quite often when the first barrier arrives (at that time the only priority event, morePriorityEvents = false) and triggers the whole checkpointing process. The second barrier would then complete the `getPriorityEventAvailableFuture` causing a more or less immediate re-execution of this method. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org