[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230636#comment-17230636 ]
Arvid Heise commented on FLINK-20097: ------------------------------------- Merged into master as 5256c210f5e0a27164c1f1e7c0916f0d6bbd5bb7. > Race conditions in InputChannel.ChannelStatePersister > ----------------------------------------------------- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network > Affects Versions: 1.12.0 > Reporter: Roman Khachatryan > Assignee: Roman Khachatryan > Priority: Critical > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)