[ 
https://issues.apache.org/jira/browse/FLINK-17869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-17869:
-----------------------------
    Description: 
On ChannelStateWriter side, the lifecycle of checkpoint should be as follows:

start -> in progress/abort -> stop.

The ChannelStateWriteResult is created during #start, and removed by #abort or 
#stop processes. There are some potential race conditions here:
 * #start is called while receiving the first barrier by netty thread and 
schedule to execute the checkpoint
 * The task thread might process cancel checkpoint and call #abort before 
performing the above respective checkpoint
 * The checkpoint can still be executed by task thread afterwards even thought 
the above abort happened before, because we can not remove the checkpoint 
action from mailbox during aborting.
 * While checkpoint executing, it will call `ChannelStateWriter#getWriteResult` 
then it would cause `IllegalStateException` because the respective result was 
already removed in advance during handling #abort method before.
 * Therefore it will cause unnecessary task failure during performing checkpoint

I guess we do not want to fail the task when one checkpoint is aborted by 
design. And the illegal state check during ChannelStateWriter#getWriteResult 
was mainly proposed for normal process validation I guess.

If we do not remove the ChannelStateWriteResult while handling #abort and rely 
on #stop to remove it, then it might probably exist another scenario that the 
checkpoint will never be performed after #start (we have another mechanism to 
exit the triggering checkpoint in advance if the abort is sent by 
CheckpointCoordinator), then the legacy ChannelStateWriteResult will be 
retained inside ChannelStateWriter long time.

Maybe the potential option to fix this issue is to let 
SubtaskCheckpointCoordinatorImpl handle the exception from 
ChannelStateWriter#getWriteResult properly to not fail the task in the aborted 
case.

  was:
On ChannelStateWriter side, the lifecycle of checkpoint should be as follows:

start -> in progress/abort -> stop.

We must guarantee that #abort should be queued after #start, otherwise the 
aborted checkpoint might be started later again in the case of race condition.

There are two cases might trigger abort checkpoint:
 * CheckpointBarrierUnaligner#processEndOfPartition
 * CheckpointBarrierUnaligner#processCancellationBarrier

The current condition to execute abort checkpoint for above both cases is based 
on #isCheckpointPending(), which can not cover all the cases. The unaligned 
checkpoint might be triggered either by task thread via 
CheckpointBarrierUnaligner#processBarrier or netty thread via 
ThreadSafeUnaligner#notifyBarrierReceived. Anyway we should maintain the 
current triggered checkpoint id in order to handle both above abort cases 
properly.

Another bug is that during ChannelStateWriterImpl#abort, we should not remove 
the respective ChannelStateWriteResult. Otherwise it would throw 
IllegalArgumentException when ChannelStateWriterImpl#getWriteResult in the 
process of checkpoint. ChannelStateWriteResult should be created at #start 
method and only removed at #stop method.


> Fix the race condition of aborting unaligned checkpoint
> -------------------------------------------------------
>
>                 Key: FLINK-17869
>                 URL: https://issues.apache.org/jira/browse/FLINK-17869
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Zhijiang
>            Assignee: Zhijiang
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> On ChannelStateWriter side, the lifecycle of checkpoint should be as follows:
> start -> in progress/abort -> stop.
> The ChannelStateWriteResult is created during #start, and removed by #abort 
> or #stop processes. There are some potential race conditions here:
>  * #start is called while receiving the first barrier by netty thread and 
> schedule to execute the checkpoint
>  * The task thread might process cancel checkpoint and call #abort before 
> performing the above respective checkpoint
>  * The checkpoint can still be executed by task thread afterwards even 
> thought the above abort happened before, because we can not remove the 
> checkpoint action from mailbox during aborting.
>  * While checkpoint executing, it will call 
> `ChannelStateWriter#getWriteResult` then it would cause 
> `IllegalStateException` because the respective result was already removed in 
> advance during handling #abort method before.
>  * Therefore it will cause unnecessary task failure during performing 
> checkpoint
> I guess we do not want to fail the task when one checkpoint is aborted by 
> design. And the illegal state check during ChannelStateWriter#getWriteResult 
> was mainly proposed for normal process validation I guess.
> If we do not remove the ChannelStateWriteResult while handling #abort and 
> rely on #stop to remove it, then it might probably exist another scenario 
> that the checkpoint will never be performed after #start (we have another 
> mechanism to exit the triggering checkpoint in advance if the abort is sent 
> by CheckpointCoordinator), then the legacy ChannelStateWriteResult will be 
> retained inside ChannelStateWriter long time.
> Maybe the potential option to fix this issue is to let 
> SubtaskCheckpointCoordinatorImpl handle the exception from 
> ChannelStateWriter#getWriteResult properly to not fail the task in the 
> aborted case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to