[ https://issues.apache.org/jira/browse/FLINK-17869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Zhijiang updated FLINK-17869: ----------------------------- Description: On ChannelStateWriter side, the lifecycle of checkpoint should be as follows: start -> in progress/abort -> stop. The ChannelStateWriteResult is created during #start, and removed by #abort or #stop processes. There are some potential race conditions here: * #start is called while receiving the first barrier by netty thread and schedule to execute the checkpoint * The task thread might process cancel checkpoint and call #abort before performing the above respective checkpoint * The checkpoint can still be executed by task thread afterwards even thought the above abort happened before, because we can not remove the checkpoint action from mailbox during aborting. * While checkpoint executing, it will call `ChannelStateWriter#getWriteResult` then it would cause `IllegalStateException` because the respective result was already removed in advance during handling #abort method before. * Therefore it will cause unnecessary task failure during performing checkpoint I guess we do not want to fail the task when one checkpoint is aborted by design. And the illegal state check during ChannelStateWriter#getWriteResult was mainly proposed for normal process validation I guess. If we do not remove the ChannelStateWriteResult while handling #abort and rely on #stop to remove it, then it might probably exist another scenario that the checkpoint will never be performed after #start (we have another mechanism to exit the triggering checkpoint in advance if the abort is sent by CheckpointCoordinator), then the legacy ChannelStateWriteResult will be retained inside ChannelStateWriter long time. Maybe the potential option to fix this issue is to let SubtaskCheckpointCoordinatorImpl handle the exception from ChannelStateWriter#getWriteResult properly to not fail the task in the aborted case. was: On ChannelStateWriter side, the lifecycle of checkpoint should be as follows: start -> in progress/abort -> stop. We must guarantee that #abort should be queued after #start, otherwise the aborted checkpoint might be started later again in the case of race condition. There are two cases might trigger abort checkpoint: * CheckpointBarrierUnaligner#processEndOfPartition * CheckpointBarrierUnaligner#processCancellationBarrier The current condition to execute abort checkpoint for above both cases is based on #isCheckpointPending(), which can not cover all the cases. The unaligned checkpoint might be triggered either by task thread via CheckpointBarrierUnaligner#processBarrier or netty thread via ThreadSafeUnaligner#notifyBarrierReceived. Anyway we should maintain the current triggered checkpoint id in order to handle both above abort cases properly. Another bug is that during ChannelStateWriterImpl#abort, we should not remove the respective ChannelStateWriteResult. Otherwise it would throw IllegalArgumentException when ChannelStateWriterImpl#getWriteResult in the process of checkpoint. ChannelStateWriteResult should be created at #start method and only removed at #stop method. > Fix the race condition of aborting unaligned checkpoint > ------------------------------------------------------- > > Key: FLINK-17869 > URL: https://issues.apache.org/jira/browse/FLINK-17869 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Reporter: Zhijiang > Assignee: Zhijiang > Priority: Blocker > Fix For: 1.11.0 > > > On ChannelStateWriter side, the lifecycle of checkpoint should be as follows: > start -> in progress/abort -> stop. > The ChannelStateWriteResult is created during #start, and removed by #abort > or #stop processes. There are some potential race conditions here: > * #start is called while receiving the first barrier by netty thread and > schedule to execute the checkpoint > * The task thread might process cancel checkpoint and call #abort before > performing the above respective checkpoint > * The checkpoint can still be executed by task thread afterwards even > thought the above abort happened before, because we can not remove the > checkpoint action from mailbox during aborting. > * While checkpoint executing, it will call > `ChannelStateWriter#getWriteResult` then it would cause > `IllegalStateException` because the respective result was already removed in > advance during handling #abort method before. > * Therefore it will cause unnecessary task failure during performing > checkpoint > I guess we do not want to fail the task when one checkpoint is aborted by > design. And the illegal state check during ChannelStateWriter#getWriteResult > was mainly proposed for normal process validation I guess. > If we do not remove the ChannelStateWriteResult while handling #abort and > rely on #stop to remove it, then it might probably exist another scenario > that the checkpoint will never be performed after #start (we have another > mechanism to exit the triggering checkpoint in advance if the abort is sent > by CheckpointCoordinator), then the legacy ChannelStateWriteResult will be > retained inside ChannelStateWriter long time. > Maybe the potential option to fix this issue is to let > SubtaskCheckpointCoordinatorImpl handle the exception from > ChannelStateWriter#getWriteResult properly to not fail the task in the > aborted case. -- This message was sent by Atlassian Jira (v8.3.4#803005)