[ 
https://issues.apache.org/jira/browse/FLINK-17994?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhijiang updated FLINK-17994:
-----------------------------
    Description: 
The race condition issue happens as follow:
 * ch1 is received from network for one input channel by netty thread and 
schedule the ch1 into mailbox via #notifyBarrierReceived
 * ch2 is received from network for another input channel by netty thread, but 
before calling #notifyBarrierReceived this barrier was inserted into channel's 
data queue in advance. Then it would cause task thread process ch2 earlier than 
#notifyBarrierReceived by netty thread.
 * Task thread would execute checkpoint for ch2 immediately because ch2 > ch1.
 * After that, the previous scheduled ch1 is performed from mailbox by task 
thread, then it causes the IllegalArgumentException inside 
SubtaskCheckpointCoordinatorImpl#checkpointState because it breaks the initial 
assumption that checkpoint is executed in incremental way for aligned mode.

The key problem is that we can not remove the checkpoint action from mailbox 
queue before the next checkpoint is going to execute now. One possible solution 
is that we record the previous aborted checkpoint id inside 
SubtaskCheckpointCoordinatorImpl#abortedCheckpointIds, then when the queued 
checkpoint inside mailbox is executing, it will exit directly if found the 
checkpoint id was already aborted before.

  was:
The race condition issue happens as follow:
 * ch1 is received from network by netty thread and schedule the ch1 into 
mailbox via #notifyBarrierReceived
 * ch2 is received from network by netty thread, but before calling 
#notifyBarrierReceived this barrier was inserted into channel's data queue in 
advance. Then it would cause task thread process ch2 earlier than 
#notifyBarrierReceived by netty thread.
 * Task thread would execute checkpoint for ch2 directly because ch2 > ch1.
 * After that, the previous scheduled ch1 is performed from mailbox by task 
thread, then it causes the IllegalArgumentException inside 
SubtaskCheckpointCoordinatorImpl#checkpointState because it breaks the 
assumption that checkpoint is executed in incremental way. 

One possible solution for this race condition is inserting the received barrier 
into channel's data queue after calling #notifyBarrierReceived, then we can 
make the assumption that the checkpoint is always triggered by netty thread, to 
simplify the current situation that checkpoint might be triggered either by 
task thread or netty thread. 

To do so we can also avoid accessing #notifyBarrierReceived method by task 
thread while processing the barrier to simplify the logic inside 
CheckpointBarrierUnaligner.


> Fix the race condition between CheckpointBarrierUnaligner#processBarrier and 
> #notifyBarrierReceived
> ---------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17994
>                 URL: https://issues.apache.org/jira/browse/FLINK-17994
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>            Reporter: Zhijiang
>            Assignee: Zhijiang
>            Priority: Blocker
>             Fix For: 1.11.0
>
>
> The race condition issue happens as follow:
>  * ch1 is received from network for one input channel by netty thread and 
> schedule the ch1 into mailbox via #notifyBarrierReceived
>  * ch2 is received from network for another input channel by netty thread, 
> but before calling #notifyBarrierReceived this barrier was inserted into 
> channel's data queue in advance. Then it would cause task thread process ch2 
> earlier than #notifyBarrierReceived by netty thread.
>  * Task thread would execute checkpoint for ch2 immediately because ch2 > ch1.
>  * After that, the previous scheduled ch1 is performed from mailbox by task 
> thread, then it causes the IllegalArgumentException inside 
> SubtaskCheckpointCoordinatorImpl#checkpointState because it breaks the 
> initial assumption that checkpoint is executed in incremental way for aligned 
> mode.
> The key problem is that we can not remove the checkpoint action from mailbox 
> queue before the next checkpoint is going to execute now. One possible 
> solution is that we record the previous aborted checkpoint id inside 
> SubtaskCheckpointCoordinatorImpl#abortedCheckpointIds, then when the queued 
> checkpoint inside mailbox is executing, it will exit directly if found the 
> checkpoint id was already aborted before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to