[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16335523#comment-16335523
 ] 

ASF GitHub Bot commented on FLINK-8484:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5337#discussion_r163175172
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
    @@ -210,16 +210,18 @@ public void run(SourceContext<T> sourceContext) 
throws Exception {
                for (StreamShardHandle shard : allShards) {
                        StreamShardMetadata kinesisStreamShard = 
KinesisDataFetcher.convertToStreamShardMetadata(shard);
                        if (sequenceNumsToRestore != null) {
    -                           if 
(sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
    +                           // find the sequence number for the given 
converted kinesis shard in our restored state
    --- End diff --
    
    nit: Capital 'K' for Kinesis


> Kinesis consumer re-reads closed shards on job restart
> ------------------------------------------------------
>
>                 Key: FLINK-8484
>                 URL: https://issues.apache.org/jira/browse/FLINK-8484
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>    Affects Versions: 1.3.2
>            Reporter: Philip Luppens
>            Priority: Major
>              Labels: bug, flink, kinesis
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to