gaoyunhaii opened a new pull request #16905:
URL: https://github.com/apache/flink/pull/16905


   ## What is the purpose of the change
   
   This PR fixes the concurrent issue between triggering savepoint and tasks 
finish.  When triggering stop-with-savepoint --drain, we would first stop the 
operators and then trigger a savepoint. However, currently for all the three 
tasks, namely `SourceStreamTask`, `SourceOperatorStreamTask` and 
`MultipleInputStreamTask`,  after stopping (done in the main thread), 
`triggerCheckpointAsync()` is done in non-mailbox thread, if the thread run 
slowly, when `triggerCheckpointAsync` adds the mail for triggering savepoint, 
the task may already execute its finish process and close the mailbox, which 
would cause the trigger fail. Since currently we assert the trigger throws no 
exceptions, thus trigger failure would cause fatal error and make the TM exit. 
   
   To fix this issue, we would like to ensure the trigger happens immediately 
after the operators are "finished", before the tasks head to execute the logic 
in `afterInvoke()`. We would discuss in detail in the following.
   
   **SourceOperatorStreamTask and MultipleInputStreamTask**
   
   For these two tasks, the `mainOperator.stop()` happens in the akka thread, 
this might cause issues since this method would modify some variables that also 
access in the mailbox thread. 
   
   Besides, `stop()` return a future, which would be completed after finish() 
method is called, and akka thread would add a new stage to call 
`triggerCheckpointAsync()`. If when the stage is added, the finish() method is  
not called, then there should be no problem since the `finish()` method is 
called in the mailbox thread, and the trigger happens immediately after 
`finish()`. But if when the stage is added `finish()` method is already called, 
then the stage would be executed directly inside the akka thread and the order 
with task's finish process is not determined. 
   
   To fix this issue, we might make the whole process, namely stop operator, 
add stage, in the mailbox thread. In this way we ensures the checkpoint trigger 
happens right after the `finish()` method is called. 
   
   **SourceStreamTask**
   
   The source stream task currently add a new stage to `triggerCheckpointAsync` 
to the `sourceThread.getCompletionFuture()`, however, the stage is add in the 
akka thread and the future is completed in the legacy source thread, both of 
them could not guarantee the mail is added before tasks finish.
   
   To fix this issue, we introduce a new `allDataFinishedFuture` that is 
completed inside the mailbox, right before the legacy source thread completed. 
Then similar to the other two tasks. the  stage to trigger checkpoint could be 
added to this future. 
   
   Some other options might not be preferred due to:
   1. Only move stopping job and triggering checkpoint inside mailbox thread: 
since the `completionFuture` is not completed in the mailbox thread, thus there 
should be still problem.
   2. Completes `completionFuture` directly in the mailbox: if we move all 
positions into mailbox thread, there might be deadlocks in cancel / failed 
case: the mailbox throw an error and head to cleanup invoke, which would also 
wait for the `completionFuture`. If we instead only move to the mailbox thread 
if the `finishReason` is `STOP_WITH_SAVEPOINT_DRAIN`, we might need to have 
different logic for this future and might cause some complexity. 
   
   **Do we need assert no exceptions ?**
   
   I think perhaps we do not need to assert no exceptions here? If the task is 
still running when triggering, the trigger would be expected to succeed, but if 
the task is finished / canceled / failed, the trigger would be expected to 
fail. Currently we already have logic to deal with the second case that the 
checkpoint would be declined and in JM side a global failover would be 
triggered to keep consistent. Thus it seems to me we could remove the assertion 
? otherwise the trigger failure would cause the TM process to exit, which might 
not be expected. 
   
   ## Brief change log
   
   - 5e7725f41ca06cbd05f9d7e7a3978937a02639aa fixes the issue. 
   
   ## Verifying this change
   
   It seems to me that it would be hard to add a special test case for this 
issue. Thus we only add generalized UT for triggering stop-with-savepoint 
--drain.
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: **yes**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to