StefanRRichter commented on a change in pull request #8361: 
[FLINK-12434][network] Replace listeners with CompletableFuture in InputGates
URL: https://github.com/apache/flink/pull/8361#discussion_r282024637
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ##########
 @@ -223,15 +214,24 @@ public void requestPartitions() throws IOException, 
InterruptedException {
                                                return Optional.empty();
                                        }
                                }
-                               inputGate = inputGatesWithData.remove();
-                               enqueuedInputGatesWithData.remove(inputGate);
-                               moreInputGatesAvailable = 
enqueuedInputGatesWithData.size() > 0;
-                       }
+                               final InputGate inputGate = 
inputGatesWithData.remove();
+
+                               // In case of inputGatesWithData being 
inaccurate do not block on an empty inputGate, but just poll the data.
+                               Optional<BufferOrEvent> bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
 
-                       // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
-                       Optional<BufferOrEvent> bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
-                       if (bufferOrEvent.isPresent()) {
-                               return Optional.of(new 
InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable));
+                               if (bufferOrEvent.isPresent() && 
bufferOrEvent.get().moreAvailable()) {
+                                       // enqueue the inputGate at the end to 
avoid starvation
+                                       inputGatesWithData.add(inputGate);
+                               } else {
+                                       
enqueuedInputGatesWithData.remove(inputGate);
+                               }
+
+                               if (bufferOrEvent.isPresent()) {
 
 Review comment:
   Similar to the loop head in the single input, I think this can be fused by 
splitting the if in line 222.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to