[ https://issues.apache.org/jira/browse/FLINK-13063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16878132#comment-16878132 ]
Stephan Ewen commented on FLINK-13063: -------------------------------------- +1 to fix this for now by forcing Async I/O operator to always be head-of-chain > AsyncWaitOperator shouldn't be releasing checkpointingLock > ---------------------------------------------------------- > > Key: FLINK-13063 > URL: https://issues.apache.org/jira/browse/FLINK-13063 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.6.4, 1.7.2, 1.8.1, 1.9.0 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Critical > > 1. > For the following setup of chained operators: > {noformat} > SourceOperator -> FlatMap -> AsyncOperator{noformat} > Lets assume that input buffer of {{AsyncOperator}} is full. We start > processing a record from the {{SourceOperator}}, we pass it to the > {{FlatMap}}, which fan it out (multiplies it 10 times). First multiplied > record reaches {{AsyncOperator}} and is special treated (stored in > {{AsyncWaitOperator#pendingStreamElementQueueEntry}} ) and then > {{AsyncWaitOperator}} waits (and releases) on the checkpoint lock (in > {{AsyncWaitOperator#addAsyncBufferEntry}} . If a checkpoint is triggered now, > both {{SourceOperator}} and {{FlatMap}} will be checkpointed assumed that all > of those 10 multiplied records were processed, which is not true. Only the > first one is checkpointed by the {{AsyncWatiOperator}}. Remaining 9 are not. > So if we ever restore state from this checkpoint, we have lost those 9 > records. > 2. > Similar issue (I think previously known) can happen if for example some > upstream operator to the {{AsyncOperator}} fires a processing time timer, > that emits some data. But in that case, > {{AsyncWaitOperator#pendingStreamElementQueueEntry}} is being overwritten. > 3. > If upstream operator has the following pseudo code: > {code:java} > stateA = true > output.collect(x) > stateB = true{code} > one would assume that stateA and stateB access/writes will be atomic from the > perspective of the checkpoints. But again, because {{AsyncWaitOperator}} > releases the checkpoint lock, they will not be. > CC [~aljoscha] [~StephanEwen] [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)