AHeise commented on a change in pull request #13228:
URL: https://github.com/apache/flink/pull/13228#discussion_r492601080



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -171,10 +183,36 @@ public InputStatus processInput() throws Exception {
                }
 
                InputStatus inputStatus = 
inputProcessors[readingInputIndex].processInput();
+               updatePriorityAvailability();
                checkFinished(inputStatus, readingInputIndex);
                return inputSelectionHandler.updateStatus(inputStatus, 
readingInputIndex);
        }
 
+       private void updatePriorityAvailability() {
+               if (lastPriorityInputIndex != InputSelection.NONE_AVAILABLE) {
+                       final CompletableFuture<?> priorityEventAvailableFuture 
=
+                               
inputProcessors[lastPriorityInputIndex].taskInput.getPriorityEventAvailableFuture();
+                       // no more priority events for the input
+                       if (!priorityEventAvailableFuture.isDone()) {
+                               
prioritySelectionHandler.setUnavailableInput(lastPriorityInputIndex);
+                               if 
(!prioritySelectionHandler.isAnyInputAvailable()) {
+                                       priorityAvailability.resetUnavailable();
+                               }
+                               
priorityEventAvailableFuture.thenRun(onPriorityEvent(lastPriorityInputIndex));
+                       }
+               }
+       }
+
+       private Runnable onPriorityEvent(int index) {
+               return () -> {
+                       // set the priority flag in a mail before notifying 
StreamTask of availability
+                       mainMailboxExecutor.execute(() -> {
+                               
prioritySelectionHandler.setAvailableInput(index);
+                               
priorityAvailability.getUnavailableToResetAvailable().complete(null);
+                       }, "priority event {}", index);

Review comment:
       Commit removed; on `CheckpointedInputGate`, I'm adding the 
gate.toString().




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to