dawidwys commented on a change in pull request #16184:
URL: https://github.com/apache/flink/pull/16184#discussion_r667937243



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -443,6 +455,10 @@ protected void finishOperators(StreamTaskActionExecutor 
actionExecutor) throws E
      * StreamTask}. Closing happens from <b>tail to head</b> operator in the 
chain.
      */
     protected void closeAllOperators() throws Exception {
+        if (isFinishedOnRestore()) {

Review comment:
       nit: let's unify it and use either the getter or the field.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -265,6 +265,14 @@ protected void declineCheckpoint(long checkpointId) {
 
         @Override
         public void run() {
+            if (operatorChain.isFinishedOnRestore()) {

Review comment:
       I am fine with that solution, but I thought why can't we do it a bit 
earlier and don't even start the thread? I think we could move this block to 
the `processInput` method (plus a small adjustment to the 
`interruptSourceThread` to skip it if the operator chain has finished. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to