sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] 
Implement the runtime handling of BoundedOneInput and BoundedMultiInput
URL: https://github.com/apache/flink/pull/8731#discussion_r293741881
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##########
 @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws Exception {
                }
        }
 
+       /**
+        * Starting from the second operator, go forward through the operator 
chain to notify
+        * each operator that its input has ended.
+        *
+        * @throws Exception if some exception happens in the endInput function 
of an operator.
+        */
+       public void endOperatorInputs() throws Exception {
+               for (int i = allOperators.length - 2; i >= 0; i--) {
 
 Review comment:
   > I think you could simplify the code if you iterated through all operators 
in OperatorChain (not starting from the second one), check each one of them if 
there are instanceof BoundedOneInput and end them if they are.
   >
   >This would work for:
   >
   
   > - OneInputStreamTask - because all of them are/can be BoundedOneInput
   > - SourceStreamTask - because head will not be BoundedOneInput
   > - TwoInputStreamTask - because head will not be BoundedOneInput
   
   You are right. However, if chaining non-header two input operator is allowed 
in the future, it will have problems. What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to