[ https://issues.apache.org/jira/browse/FLINK-13063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski updated FLINK-13063: ----------------------------------- Description: For the following setup of chained operators: {noformat} SourceOperator -> FlatMap -> AsyncOperator{noformat} Lets assume that input buffer of {{AsyncOperator}} is full. We start processing a record from the {{SourceOperator}}, we pass it to the {{FlatMap}}, which fan it out (multiplies it 10 times). First multiplied record reaches {{AsyncOperator}} and is special treated (stored in {{AsyncWaitOperator#pendingStreamElementQueueEntry}} ) and then {{AsyncWaitOperator}} waits (and releases) on the checkpoint lock (in {{AsyncWaitOperator#addAsyncBufferEntry}} . If a checkpoint is triggered now, both {{SourceOperator}} and {{FlatMap}} will be checkpointed assumed that all of those 10 multiplied records were processed, which is not true. Only the first one is checkpointed by the {{AsyncWatiOperator}}. Remaining 9 are not. So if we ever restore state from this checkpoint, we have lost those 9 records. Similar issue (I think previously known) can happen if for example some upstream operator to the {{AsyncOperator}} fires a processing time timer, that emits some data. But in that case, {{AsyncWaitOperator#pendingStreamElementQueueEntry}} is being overwritten. CC [~aljoscha] [~StephanEwen] [~srichter] was: For the following setup of chained operators: {noformat} SourceOperator -> FlatMap -> AsyncOperator{noformat} Lets assume that input buffer of {{AsyncOperator}} is full. We start processing a record from the {{SourceOperator}}, we pass it to the {{FlatMap}}, which fan it out (multiplies it 10 times). First multiplied record reaches {{AsyncOperator}} and is special treated (stored in {{AsyncWaitOperator#pendingStreamElementQueueEntry}} ) and then {{AsyncWaitOperator}} waits (and releases) on the checkpoint lock (in {{AsyncWaitOperator#addAsyncBufferEntry}} . If a checkpoint is triggered now, both {{SourceOperator}} and {{FlatMap}} will be checkpointed assumed that all of those 10 multiplied records were processed, which is not true. Only the first one is checkpointed by the {{AsyncWatiOperator}}. Remaining 9 are not. So if we ever restore state from this checkpoint, we have lost those 9 records. CC [~aljoscha] [~StephanEwen] [~srichter] > AsyncWaitOperator can loose data during checkpointing > ----------------------------------------------------- > > Key: FLINK-13063 > URL: https://issues.apache.org/jira/browse/FLINK-13063 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.6.4, 1.7.2, 1.8.1, 1.9.0 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Critical > > For the following setup of chained operators: > {noformat} > SourceOperator -> FlatMap -> AsyncOperator{noformat} > Lets assume that input buffer of {{AsyncOperator}} is full. We start > processing a record from the {{SourceOperator}}, we pass it to the > {{FlatMap}}, which fan it out (multiplies it 10 times). First multiplied > record reaches {{AsyncOperator}} and is special treated (stored in > {{AsyncWaitOperator#pendingStreamElementQueueEntry}} ) and then > {{AsyncWaitOperator}} waits (and releases) on the checkpoint lock (in > {{AsyncWaitOperator#addAsyncBufferEntry}} . If a checkpoint is triggered now, > both {{SourceOperator}} and {{FlatMap}} will be checkpointed assumed that all > of those 10 multiplied records were processed, which is not true. Only the > first one is checkpointed by the {{AsyncWatiOperator}}. Remaining 9 are not. > So if we ever restore state from this checkpoint, we have lost those 9 > records. > Similar issue (I think previously known) can happen if for example some > upstream operator to the {{AsyncOperator}} fires a processing time timer, > that emits some data. But in that case, > {{AsyncWaitOperator#pendingStreamElementQueueEntry}} is being overwritten. > CC [~aljoscha] [~StephanEwen] [~srichter] -- This message was sent by Atlassian JIRA (v7.6.3#76005)