[ https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336373#comment-16336373 ]
ASF GitHub Bot commented on FLINK-8484: --------------------------------------- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5337 I am not deeply into the Kinesis Consumer logic, just writing here to double check that we do not build a solution where state grows infinitely. For example, it would not be feasible to hold onto all shard info forever (state would always grow), but there would need to be a way track all closed shards via constant state (like a threshold timestamp, sequence number, etc). > Kinesis consumer re-reads closed shards on job restart > ------------------------------------------------------ > > Key: FLINK-8484 > URL: https://issues.apache.org/jira/browse/FLINK-8484 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector > Affects Versions: 1.4.0, 1.3.2 > Reporter: Philip Luppens > Assignee: Philip Luppens > Priority: Blocker > Labels: bug, flink, kinesis > Fix For: 1.3.3, 1.5.0, 1.4.1 > > > We’re using the connector to subscribe to streams varying from 1 to a 100 > shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis > stream up and down during peak times. What we’ve noticed is that, while we > were having closed shards, any Flink job restart with check- or save-point > would result in shards being re-read from the event horizon, duplicating our > events. > > We started checking the checkpoint state, and found that the shards were > stored correctly with the proper sequence number (including for closed > shards), but that upon restarts, the older closed shards would be read from > the event horizon, as if their restored state would be ignored. > > In the end, we believe that we found the problem: in the > FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned > from the KinesisDataFetcher against the shards’ metadata from the restoration > point, but we do this via a containsKey() call, which means we’ll use the > StreamShardMetadata’s equals() method. However, this checks for all > properties, including the endingSequenceNumber, which might have changed > between the restored state’s checkpoint and our data fetch, thus failing the > equality check, failing the containsKey() check, and resulting in the shard > being re-read from the event horizon, even though it was present in the > restored state. > > We’ve created a workaround where we only check for the shardId and stream > name to restore the state of the shards we’ve already seen, and this seems to > work correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)