[ https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335523#comment-16335523 ]
ASF GitHub Bot commented on FLINK-8484: --------------------------------------- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/5337#discussion_r163175172 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java --- @@ -210,16 +210,18 @@ public void run(SourceContext<T> sourceContext) throws Exception { for (StreamShardHandle shard : allShards) { StreamShardMetadata kinesisStreamShard = KinesisDataFetcher.convertToStreamShardMetadata(shard); if (sequenceNumsToRestore != null) { - if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) { + // find the sequence number for the given converted kinesis shard in our restored state --- End diff -- nit: Capital 'K' for Kinesis > Kinesis consumer re-reads closed shards on job restart > ------------------------------------------------------ > > Key: FLINK-8484 > URL: https://issues.apache.org/jira/browse/FLINK-8484 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector > Affects Versions: 1.3.2 > Reporter: Philip Luppens > Priority: Major > Labels: bug, flink, kinesis > > We’re using the connector to subscribe to streams varying from 1 to a 100 > shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis > stream up and down during peak times. What we’ve noticed is that, while we > were having closed shards, any Flink job restart with check- or save-point > would result in shards being re-read from the event horizon, duplicating our > events. > > We started checking the checkpoint state, and found that the shards were > stored correctly with the proper sequence number (including for closed > shards), but that upon restarts, the older closed shards would be read from > the event horizon, as if their restored state would be ignored. > > In the end, we believe that we found the problem: in the > FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned > from the KinesisDataFetcher against the shards’ metadata from the restoration > point, but we do this via a containsKey() call, which means we’ll use the > StreamShardMetadata’s equals() method. However, this checks for all > properties, including the endingSequenceNumber, which might have changed > between the restored state’s checkpoint and our data fetch, thus failing the > equality check, failing the containsKey() check, and resulting in the shard > being re-read from the event horizon, even though it was present in the > restored state. > > We’ve created a workaround where we only check for the shardId and stream > name to restore the state of the shards we’ve already seen, and this seems to > work correctly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)