AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492601594



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -171,10 +183,36 @@ public InputStatus processInput() throws Exception {
                }
 
                InputStatus inputStatus = 
inputProcessors[readingInputIndex].processInput();
+               updatePriorityAvailability();
                checkFinished(inputStatus, readingInputIndex);
                return inputSelectionHandler.updateStatus(inputStatus, 
readingInputIndex);
        }
 
+       private void updatePriorityAvailability() {
+               if (lastPriorityInputIndex != InputSelection.NONE_AVAILABLE) {
+                       final CompletableFuture<?> priorityEventAvailableFuture 
=
+                               
inputProcessors[lastPriorityInputIndex].taskInput.getPriorityEventAvailableFuture();
+                       // no more priority events for the input
+                       if (!priorityEventAvailableFuture.isDone()) {
+                               
prioritySelectionHandler.setUnavailableInput(lastPriorityInputIndex);
+                               if 
(!prioritySelectionHandler.isAnyInputAvailable()) {
+                                       priorityAvailability.resetUnavailable();
+                               }
+                               
priorityEventAvailableFuture.thenRun(onPriorityEvent(lastPriorityInputIndex));
+                       }
+               }
+       }
+
+       private Runnable onPriorityEvent(int index) {
+               return () -> {
+                       // set the priority flag in a mail before notifying 
StreamTask of availability
+                       mainMailboxExecutor.execute(() -> {
+                               
prioritySelectionHandler.setAvailableInput(index);
+                               
priorityAvailability.getUnavailableToResetAvailable().complete(null);

Review comment:
       Moved it even further up towards `CheckpointedInputGate`. At this point, 
we need to make sure that a priority event is really at the top (hence the 
optimistic lock protocol for notification).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to