[ https://issues.apache.org/jira/browse/FLINK-20097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17230236#comment-17230236 ]
Arvid Heise edited comment on FLINK-20097 at 11/11/20, 9:25 PM: ---------------------------------------------------------------- {noformat} Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) {noformat} This looks very much like FLINK-20030. was (Author: aheise): ``` Caused by: java.util.NoSuchElementException at java.util.ArrayDeque$DeqIterator.next(ArrayDeque.java:642) ``` This looks very much like FLINK-20030. > Race conditions in InputChannel.ChannelStatePersister > ----------------------------------------------------- > > Key: FLINK-20097 > URL: https://issues.apache.org/jira/browse/FLINK-20097 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing, Runtime / Network > Affects Versions: 1.12.0 > Reporter: Roman Khachatryan > Assignee: Roman Khachatryan > Priority: Major > Labels: pull-request-available > Fix For: 1.12.0 > > > In InputChannel.ChannelStatePersister, stopPersisting() and checkForBarrier() > always update pendingCheckpointBarrierId, potentially overwriting newer id > (or BARRIER_RECEIVED value) with an old one. > For stopPersisting(), consider a case: > # Two consecutive UC barriers arrive at the same channel (1st being stale at > some point) > # In RemoteInputChannel.onBuffer, netty thread updates > pendingCheckpointBarrierId to BARRIER_RECEIVED > # Task thread processes the 1st barrier and triggers a checkpoint > Task thread processes the 2nd barrier and aborts 1st checkpoint, calling > stopPersisting() from UC controller and setting pendingCheckpointBarrierId to > CHECKPOINT_COMPLETED > # Task thread starts 2nd checkpoint and calls startPersisting() setting > pendingCheckpointBarrierId to 2 > # now new buffers have a chance to be included in the 2nd checkpoint (though > they belong to the next one) > > For pendingCheckpointBarrierId(), consider an input gate with two channels A > and B and two barriers 1 and 2: > # Channel A receives both barriers, channel B receives nothing yet > # Task thread processes both barriers on A, eventually triggering 2nd > checkpoint > # Channel A state is now BARRIER_RECEIVED, channel B - pending (with id=2) > # Channel B receives the 1st barrier and becomes BARRIER_RECEIVED > # No buffers in B between barriers 1 and 2 will be included in the > checkpoint > # Channel B receives the 2nd barrier which will eventually conclude the > checkpoint > > I see a solution in doing an action only if passed checkpointId >= > pendingCheckpointId. For that, a separate field will be needed to hold the > status (RECEIVED/COMPLETED/PENDING). The class isn't thread-safe so it > shouldn't be a problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)