[ https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Robert Metzger resolved FLINK-4080. ----------------------------------- Resolution: Fixed Resolved in http://git-wip-us.apache.org/repos/asf/flink/commit/fa42cdab > Kinesis consumer not exactly-once if stopped in the middle of processing > aggregated records > ------------------------------------------------------------------------------------------- > > Key: FLINK-4080 > URL: https://issues.apache.org/jira/browse/FLINK-4080 > Project: Flink > Issue Type: Sub-task > Components: Kinesis Connector, Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Critical > Fix For: 1.1.0 > > > I've occasionally experienced unsuccessful ManualExactlyOnceTest after > several tries. > Kinesis records of the same aggregated batch will have the same sequence > number, and different sub-sequence numbers > (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html). > The current code of the consumer is committing state every time it finishes > processing a record, even de-aggregated ones. This is a bug since this will > incorrectly mark all remaining records of the de-aggregated batch as > processed in the state. > Proposed fix: > 1. Use the extended `UserRecord` class in KCL to represent all records > (either non- or de-aggregated) instead of the basic `Record` class. This > gives access to whether or not the record was originally aggregated. > 2. The sequence number state we are checkpointing needs to be able to > indicate that the last seen sequence number of a shard may be a de-aggregated > shard, i.e., {"shard0" -> "5:8", "shard1" -> "2"} meaning the 8th sub-record > of the 5th record was last seen for shard 0. On restore, we start again from > record 5 for shard 0 and skip the first 7 sub-records; however, for shard 1 > we start from record 3 since record 2 is non-aggregated and already fully > processed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)