pnowojski edited a comment on pull request #16905: URL: https://github.com/apache/flink/pull/16905#issuecomment-904506952
I'm not sure if this is the right way to go. A couple of issues. 1. Do we want to create a contract, that a future returned from `mainOperator.stop()` is always completed from the mailbox thread? If that would be no longer the case, the same issue would come back. 2. Another one, is that when that future from 1. is completed, you are enqueuing another mailbox action (inside `triggerSourcesCheckpointAsync()`), but the important bit I'm not sure if it's guaranteed to work as you expect it to work. Future from `stop()` is completed in `StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator`, and more precisely in `deferFinishOperatorToMailbox()`. Are you sure that the mailbox action that you enqueued in `triggerSourcesCheckpointAsync()` will be executed before we return from `quiesceTimeServiceAndFinishOperator()`? I'm not sure. I think there is a chance that it will be executed only in `mailboxProcessor.drain();` if we are unlucky. What that means, is that it currently works because `triggerSourcesCheckpointAsync()` is enqueuing only single mailbox action. If that was split into two, we could have seen the second action rejected because mailbox is in `quiesce` mode. I'm not sure if I'm right or wrong above the things above. But it creates a feeling for me that this solution is very complicated and possibly fragile. A counterproposal. On the other hand, the whole idea of `stop-with-savepoint --drain` was supposed to be working the exactly same way as waiting for final checkpoint after sources finished normally. And I think we already have a code to handle this exactly same issue for FLIP-147 final's checkpoint. However I think the issue might be that it's hidden behind the feature toggle `areCheckpointsWithFinishedTasksEnabled` in `StreamTask#afterInvoke`. There we are keep running the mailbox loop and waiting for the final checkpoint to complete (`StreamTask#finalCheckpointCompleted`). So isn't the alternate solution as simple as fixing/adjusting [if condition in `afterInvoke`](https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L830) to keep the task waiting also with `stop-with-savepoint --drain` even if FLIP-147 is disabled? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
