dawidwys commented on a change in pull request #16184:
URL: https://github.com/apache/flink/pull/16184#discussion_r667937243
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -443,6 +455,10 @@ protected void finishOperators(StreamTaskActionExecutor
actionExecutor) throws E
* StreamTask}. Closing happens from <b>tail to head</b> operator in the
chain.
*/
protected void closeAllOperators() throws Exception {
+ if (isFinishedOnRestore()) {
Review comment:
nit: let's unify it and use either the getter or the field.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
##########
@@ -265,6 +265,14 @@ protected void declineCheckpoint(long checkpointId) {
@Override
public void run() {
+ if (operatorChain.isFinishedOnRestore()) {
Review comment:
I am fine with that solution, but I thought why can't we do it a bit
earlier and don't even start the thread? I think we could move this block to
the `processInput` method (plus a small adjustment to the
`interruptSourceThread` to skip it if the operator chain has finished. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]