zhijiangW commented on a change in pull request #9854: [FLINK-14230][task] Change the endInput call of the downstream operator to after the upstream operator closes URL: https://github.com/apache/flink/pull/9854#discussion_r338499468
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java ########## @@ -244,46 +239,32 @@ public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { } /** - * Ends an input (specified by {@code inputId}) of the {@link StreamTask}. The {@code inputId} + * Ends the input of an operator, which specified by {@code inputId}). The {@code inputId} * is numbered starting from 1, and `1` indicates the first input. * + * @param operatorIndex The index of the operator in the operator list of the chain. * @param inputId The ID of the input. - * @throws Exception if some exception happens in the endInput function of an operator. + * @throws Exception if some exception happens in the endInput method of the operator. */ - public void endInput(int inputId) throws Exception { - if (finishedInputs.areAllInputsSelected()) { - return; - } - - if (headOperator instanceof TwoInputStreamOperator) { - if (finishedInputs.isInputSelected(inputId)) { - return; - } - - if (headOperator instanceof BoundedMultiInput) { - ((BoundedMultiInput) headOperator).endInput(inputId); - } + public void endOperatorInput(int operatorIndex, int inputId) throws Exception { Review comment: This method could be removed actually, because the array of `allOperators` was already looped in `StreamTask`. We could provide two methods in `OperatorChain` `#endNonHeadOperatorInput(StreamOperator<?> streamOperator)` : it is used by `StreamTask#closeAllOperators`. `#endHeadOperatorInput(int inputIndex)` : it is used by `StreamOne/TwoInputProcessor, StreamSource`. `#internalEndOperatorInput(StreamOperator<?>StreamOperator, int inputIndex)` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services