AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r493044470



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
##########
@@ -62,9 +65,37 @@
         */
        public CheckpointedInputGate(
                        InputGate inputGate,
-                       CheckpointBarrierHandler barrierHandler) {
+                       CheckpointBarrierHandler barrierHandler,
+                       MailboxExecutor mailboxExecutor) {
                this.inputGate = inputGate;
                this.barrierHandler = barrierHandler;
+               this.mailboxExecutor = mailboxExecutor;
+
+               waitForPriorityEvents(inputGate, mailboxExecutor);
+       }
+
+       /**
+        * Eagerly pulls and processes all priority events. Must be called from 
task thread.
+        *
+        * <p>Basic assumption is that no priority event needs to be handled by 
the {@link StreamTaskNetworkInput}.
+        */
+       private void processPriorityEvents() throws IOException, 
InterruptedException {
+               // check if the priority event is still not processed (could 
have been pulled before mail was being executed)
+               if (inputGate.getPriorityEventAvailableFuture().isDone()) {
+                       // process as many priority events as possible
+                       while 
(pollNext().map(BufferOrEvent::morePriorityEvents).orElse(false)) {
+                       }
+               }
+

Review comment:
       Good idea, I solved it in the following way:
   
   ```
                // check if the priority event is still not processed (could 
have been pulled before mail was being executed)
                boolean hasPriorityEvent = 
inputGate.getPriorityEventAvailableFuture().isDone();
                while (hasPriorityEvent) {
                        // process as many priority events as possible
                        final Optional<BufferOrEvent> bufferOrEventOpt = 
pollNext();
                        bufferOrEventOpt.ifPresent(bufferOrEvent ->
                                checkState(bufferOrEvent.hasPriority(), "Should 
only poll priority events"));
                        hasPriorityEvent = 
bufferOrEventOpt.map(BufferOrEvent::morePriorityEvents).orElse(false);
                }
   ```
   
   `checkState(!inputGate.getPriorityEventAvailableFuture().isDone())` might be 
failing if netty receives a new priority event and triggers this available 
future while the task thread polled the last priority event. This case should 
happen quite often when the first barrier arrives (at that time the only 
priority event, morePriorityEvents = false) and triggers the whole 
checkpointing process. The second barrier would then complete the 
`getPriorityEventAvailableFuture` causing a more or less immediate re-execution 
of this method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to