sunhaibotb commented on a change in pull request #8731: [FLINK-11878][runtime] Implement the runtime handling of BoundedOneInput and BoundedMultiInput URL: https://github.com/apache/flink/pull/8731#discussion_r293741881
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -225,6 +226,21 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { } } + /** + * Starting from the second operator, go forward through the operator chain to notify + * each operator that its input has ended. + * + * @throws Exception if some exception happens in the endInput function of an operator. + */ + public void endOperatorInputs() throws Exception { + for (int i = allOperators.length - 2; i >= 0; i--) { Review comment: > I think you could simplify the code if you iterated through all operators in OperatorChain (not starting from the second one), check each one of them if there are instanceof BoundedOneInput and end them if they are. > >This would work for: > > - OneInputStreamTask - because all of them are/can be BoundedOneInput > - SourceStreamTask - because head will not be BoundedOneInput > - TwoInputStreamTask - because head will not be BoundedOneInput You are right. However, if chaining non-header two input operator is allowed in the future, it will have problems. What do you think? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services