zhijiangW commented on a change in pull request #9854: [FLINK-14230][task] 
Change the endInput call of the downstream operator to after the upstream 
operator closes
URL: https://github.com/apache/flink/pull/9854#discussion_r338499468
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 ##########
 @@ -244,46 +239,32 @@ public void prepareSnapshotPreBarrier(long checkpointId) 
throws Exception {
        }
 
        /**
-        * Ends an input (specified by {@code inputId}) of the {@link 
StreamTask}. The {@code inputId}
+        * Ends the input of an operator, which specified by {@code inputId}). 
The {@code inputId}
         * is numbered starting from 1, and `1` indicates the first input.
         *
+        * @param operatorIndex The index of the operator in the operator list 
of the chain.
         * @param inputId The ID of the input.
-        * @throws Exception if some exception happens in the endInput function 
of an operator.
+        * @throws Exception if some exception happens in the endInput method 
of the operator.
         */
-       public void endInput(int inputId) throws Exception {
-               if (finishedInputs.areAllInputsSelected()) {
-                       return;
-               }
-
-               if (headOperator instanceof TwoInputStreamOperator) {
-                       if (finishedInputs.isInputSelected(inputId)) {
-                               return;
-                       }
-
-                       if (headOperator instanceof BoundedMultiInput) {
-                               ((BoundedMultiInput) 
headOperator).endInput(inputId);
-                       }
+       public void endOperatorInput(int operatorIndex, int inputId) throws 
Exception {
 
 Review comment:
   This method could be removed actually, because the array of `allOperators` 
was already looped in `StreamTask`.
   
   We could provide two methods in `OperatorChain`
   `#endNonHeadOperatorInput(StreamOperator<?> streamOperator)` : it is used by 
`StreamTask#closeAllOperators`.
   `#endHeadOperatorInput(int inputIndex)` : it is used by 
`StreamOne/TwoInputProcessor, StreamSource`.
   `#internalEndOperatorInput(StreamOperator<?>StreamOperator, int inputIndex)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to