[ https://issues.apache.org/jira/browse/FLINK-13063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski updated FLINK-13063: ----------------------------------- Description: 1. For the following setup of chained operators: {noformat} SourceOperator -> FlatMap -> AsyncOperator{noformat} Lets assume that input buffer of {{AsyncOperator}} is full. We start processing a record from the {{SourceOperator}}, we pass it to the {{FlatMap}}, which fan it out (multiplies it 10 times). First multiplied record reaches {{AsyncOperator}} and is special treated (stored in {{AsyncWaitOperator#pendingStreamElementQueueEntry}} ) and then {{AsyncWaitOperator}} waits (and releases) on the checkpoint lock (in {{AsyncWaitOperator#addAsyncBufferEntry}} . If a checkpoint is triggered now, both {{SourceOperator}} and {{FlatMap}} will be checkpointed assumed that all of those 10 multiplied records were processed, which is not true. Only the first one is checkpointed by the {{AsyncWatiOperator}}. Remaining 9 are not. So if we ever restore state from this checkpoint, we have lost those 9 records. 2. Similar issue (I think previously known) can happen if for example some upstream operator to the {{AsyncOperator}} fires a processing time timer, that emits some data. But in that case, {{AsyncWaitOperator#pendingStreamElementQueueEntry}} is being overwritten. 3. If upstream operator has the following pseudo code: {code:java} stateA = true output.collect(x) stateB = true{code} one would assume that stateA and stateB access/writes will be atomic from the perspective of the checkpoints. But again, because {{AsyncWaitOperator}} releases the checkpoint lock, they will not be. CC [~aljoscha] [~StephanEwen] [~srichter] was: 1. For the following setup of chained operators: {noformat} SourceOperator -> FlatMap -> AsyncOperator{noformat} Lets assume that input buffer of {{AsyncOperator}} is full. We start processing a record from the {{SourceOperator}}, we pass it to the {{FlatMap}}, which fan it out (multiplies it 10 times). First multiplied record reaches {{AsyncOperator}} and is special treated (stored in {{AsyncWaitOperator#pendingStreamElementQueueEntry}} ) and then {{AsyncWaitOperator}} waits (and releases) on the checkpoint lock (in {{AsyncWaitOperator#addAsyncBufferEntry}} . If a checkpoint is triggered now, both {{SourceOperator}} and {{FlatMap}} will be checkpointed assumed that all of those 10 multiplied records were processed, which is not true. Only the first one is checkpointed by the {{AsyncWatiOperator}}. Remaining 9 are not. So if we ever restore state from this checkpoint, we have lost those 9 records. 2. Similar issue (I think previously known) can happen if for example some upstream operator to the {{AsyncOperator}} fires a processing time timer, that emits some data. But in that case, {{AsyncWaitOperator#pendingStreamElementQueueEntry}} is being overwritten. 3. If upstream operator has the following pseudo code: {code:java} stateA = true output.collect(x) stateB = true{code} one would assume that stateA and stateB access/writes will be atomic from the perspective of the checkpoints. But again, because {{AsyncWaitOperator}} releases the checkpoint lock, they will not be. CC [~aljoscha] [~StephanEwen] [~srichter] > AsyncWaitOperator is broken > --------------------------- > > Key: FLINK-13063 > URL: https://issues.apache.org/jira/browse/FLINK-13063 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.6.4, 1.7.2, 1.8.1, 1.9.0 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Critical > > 1. > For the following setup of chained operators: > {noformat} > SourceOperator -> FlatMap -> AsyncOperator{noformat} > Lets assume that input buffer of {{AsyncOperator}} is full. We start > processing a record from the {{SourceOperator}}, we pass it to the > {{FlatMap}}, which fan it out (multiplies it 10 times). First multiplied > record reaches {{AsyncOperator}} and is special treated (stored in > {{AsyncWaitOperator#pendingStreamElementQueueEntry}} ) and then > {{AsyncWaitOperator}} waits (and releases) on the checkpoint lock (in > {{AsyncWaitOperator#addAsyncBufferEntry}} . If a checkpoint is triggered now, > both {{SourceOperator}} and {{FlatMap}} will be checkpointed assumed that all > of those 10 multiplied records were processed, which is not true. Only the > first one is checkpointed by the {{AsyncWatiOperator}}. Remaining 9 are not. > So if we ever restore state from this checkpoint, we have lost those 9 > records. > 2. > Similar issue (I think previously known) can happen if for example some > upstream operator to the {{AsyncOperator}} fires a processing time timer, > that emits some data. But in that case, > {{AsyncWaitOperator#pendingStreamElementQueueEntry}} is being overwritten. > 3. > If upstream operator has the following pseudo code: > {code:java} > stateA = true > output.collect(x) > stateB = true{code} > one would assume that stateA and stateB access/writes will be atomic from the > perspective of the checkpoints. But again, because {{AsyncWaitOperator}} > releases the checkpoint lock, they will not be. > CC [~aljoscha] [~StephanEwen] [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)