[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16336373#comment-16336373
 ] 

ASF GitHub Bot commented on FLINK-8484:
---------------------------------------

Github user StephanEwen commented on the issue:

    https://github.com/apache/flink/pull/5337
  
    I am not deeply into the Kinesis Consumer logic, just writing here to 
double check that we do not build a solution where state grows infinitely.
    
    For example, it would not be feasible to hold onto all shard info forever 
(state would always grow), but there would need to be a way track all closed 
shards via constant state (like a threshold timestamp, sequence number, etc).


> Kinesis consumer re-reads closed shards on job restart
> ------------------------------------------------------
>
>                 Key: FLINK-8484
>                 URL: https://issues.apache.org/jira/browse/FLINK-8484
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Philip Luppens
>            Assignee: Philip Luppens
>            Priority: Blocker
>              Labels: bug, flink, kinesis
>             Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to