pnowojski edited a comment on pull request #16905:
URL: https://github.com/apache/flink/pull/16905#issuecomment-904506952


   I'm not sure if this is the right way to go. A couple of issues.
   1. Do we want to create a contract, that a future returned from 
`mainOperator.stop()` is always completed from the mailbox thread? If that 
would be no longer the case, the same issue would come back.
   2. Another one, is that when that future from 1. is completed, you are 
enqueuing another mailbox action (inside `triggerSourcesCheckpointAsync()`), 
but the important bit I'm not sure if it's guaranteed to work as you expect it 
to work. Future from `stop()` is completed in 
`StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator`, and more precisely 
in `deferFinishOperatorToMailbox()`. Are you sure that the mailbox action that 
you enqueued in `triggerSourcesCheckpointAsync()` will be executed before we 
return from `quiesceTimeServiceAndFinishOperator()`? I'm not sure. I think 
there is a chance that it will be executed only in `mailboxProcessor.drain();` 
if we are unlucky in `StreamTask#afterInvoke`. What that means, is that it 
currently works because `triggerSourcesCheckpointAsync()` is enqueuing only 
single mailbox action. If that was split into two (or more), we could have seen 
the second action rejected because mailbox is in `quiesce` mode.
   
   I'm not sure if I'm right or wrong above the things above. But it creates a 
feeling for me that this solution is very complicated and possibly fragile. 
   
   A counterproposal. On the other hand, the whole idea of `stop-with-savepoint 
--drain` was supposed to be working the exactly same way as waiting for final 
checkpoint after sources finished normally. And I think we already have a code 
to handle this exactly same issue for FLIP-147 final's checkpoint. However I 
think the issue might be that it's hidden behind the feature toggle 
`areCheckpointsWithFinishedTasksEnabled` in `StreamTask#afterInvoke`. There we 
are keep running the mailbox loop and waiting for the final checkpoint to 
complete (`StreamTask#finalCheckpointCompleted`). So isn't the alternate 
solution as simple as fixing/adjusting [if condition in 
`afterInvoke`](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L830)
 to keep the task waiting also with `stop-with-savepoint --drain` even if 
FLIP-147 is disabled?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to