[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-06 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354385#comment-16354385
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8484:


Merged.

1.5 - 140bb0c66257d5d09d61845731aef1dbdb90a0bd
1.4 - 7884a4f749b8e7ba7b831963cf2a3be12df877b7

> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16354360#comment-16354360
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5337


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348414#comment-16348414
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Good point. An ugly workaround would be to store a timestamp when the 
ending number is being set on a shard, and provide a configurable/sufficiently 
enough (eg. 7 days) window. It would exclude the dependency on the Kinesis API.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348395#comment-16348395
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
Here it is: https://issues.apache.org/jira/browse/FLINK-8542


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348374#comment-16348374
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
@pluppens 
My only concern is that scanning the whole list of shards can be very 
limited to AWS Kinesis's API invoke rate limitations. Also, we would then only 
be cleaning up the state on restore, meaning we would kind of be encouraging 
(in a bad way) Kinesis users to snapshot and restore every once in a while.

I think the best solution for that is probably to use a threshold constant 
as Stephan suggested, but we will need to investigate whether the Kinesis API 
supports enough information to implement this.

I'll open a separate JIRA ticket forr this, so we can properly discuss the 
issues of pruning closed shard states there.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-02-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16348367#comment-16348367
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Regarding the remark from @StephanEwen: perhaps it would be ok to re-use 
the `KinesisProxy` to return a list of all shards and compare them to the 
`sequenceNumsToRestore` to prune any shards that no longer exist? It would 
delay the restoration, but you'd be sure the state wouldn't grow indefinitely 
(we were looking at around a 1000 closed shards with a 24 hour retention 
period, so 365k per year - that's not going to end well). Another option would 
be to kick off another task periodically to prune them, but that is likely to 
run into race conditions, so doing it at the safe point of restoration would 
make more sense to me.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347093#comment-16347093
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
Great, thanks for the update!

As a side note, I will be making some additional changes to the code 
regarding the not-so-nice iteration across the `sequenceNumsToRestore` map. It 
would make sense to have a "equivalence wrapper" class around 
`StreamShardMetadata`, that only checks equivalence of the stream name and 
shard id. That wrapper class can then be used as the key of the 
`sequenceNumsToRestore` map.

No tests you added would be touched, so I assume it'll work just as fine 
for you.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16347085#comment-16347085
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Thanks - we've been running it in production for the last 5 days without 
issues, so it seems to work fine. We'll be enabling autoscaling of the streams 
in the coming hours, so if anything is amiss, it should pop up on our radar in 
the coming days.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16346809#comment-16346809
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
@pluppens the changes look good to merge! Thanks a lot for working on this.
Will merge this for `release-1.4`, `release-1.3`, and `master` ..


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-28 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16342608#comment-16342608
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
@tzulitai Is there anything more I can do from my side?


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-25 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338913#comment-16338913
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
@bowenli86 Makes sense - I've updated the description to contain the 
initial email/issue. HTH.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16338292#comment-16338292
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5337
  
good to have the background info from the email thread. I didn't have a 
full picture before


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337346#comment-16337346
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
@bowenli86 we're passing the last-seen shardId, and the Kinesis call 
returns only newer shards. Not sure if that answers your remark - because I 
didn't really understand the question either.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337274#comment-16337274
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
re @bowenli86 
yes, Kinesis shard metadata is fetched every 
`DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS` millis.

Could you describe a bit more which part you don't think is valid?
There's also a more detailed explanation of the issue here: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-Kinesis-Consumer-re-reading-merged-shards-upon-restart-td17917.html


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16337248#comment-16337248
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
re @StephanEwen 
yes, currently, state is still kept indefinitely for closed shards. A 
special `SHARD_END` marker sequence number is stored as the sequence number for 
closed shards, so that the consumer does not attempt to read them on restore.

A threshold timestamp could work if AWS API provides shard creation times. 
A threshold sequence numbers would also work if sequence numbers are always 
monotonically increasing across shards. Will need some investigation to see if 
this is feasible.
Either way, I think this is an improvement out-of-scope for the issue at 
hand, and would also require some migration path from the old state (where this 
constant threshold state doesn't exist).

For 1.4.1 bugfix, I think we should continue with the current approach.
It might make sense though, to fix this via a constant threshold state in 
1.5.0.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336396#comment-16336396
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5337
  
I'm wondering if this is valid? My understanding is that, by default, 
flink-connector-kinesis will get kinesis metadata (#shard, shard id, etc) every 
10s (defined by DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336373#comment-16336373
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5337
  
I am not deeply into the Kinesis Consumer logic, just writing here to 
double check that we do not build a solution where state grows infinitely.

For example, it would not be feasible to hold onto all shard info forever 
(state would always grow), but there would need to be a way track all closed 
shards via constant state (like a threshold timestamp, sequence number, etc).


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16336041#comment-16336041
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Alright, I've given it a quick stab - but the whole 'remove/update/re-add' 
cycle is kinda ugly due to the hashcode change. And I've just copied the test 
from the other example rather than using the harness, and the tests are pretty 
messy.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335765#comment-16335765
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
@pluppens yes, that sounds correct.

`parentShardId`
`adjacentParentShardId`
`startingHashKey`
`endingHashKey`
`startingSequenceNumber`
these should all be fixed once the shard is created by Kinesis.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335762#comment-16335762
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Just a small remark - from what I understood, the only property that *can* 
change is the endingSequenceNumber - all other state should be considered as 
'set once', so there should be no point in comparing other properties and 
synchronizing them - or did I miss something?


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335726#comment-16335726
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Ok, that makes sense to me. Give me a bit to cook up both the new test and 
the new approach, and I'll update the PR. Thank you very much for the comments!


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335703#comment-16335703
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5337
  
@pluppens yes, I think that would be the proper solution here.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335691#comment-16335691
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on the issue:

https://github.com/apache/flink/pull/5337
  
Ok, so you'd prefer to synchronize the state of the retrieve shard against 
the stored shards by comparing its stream name and shard id, before doing the 
containsKey() check?


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335689#comment-16335689
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user pluppens commented on a diff in the pull request:

https://github.com/apache/flink/pull/5337#discussion_r163226460
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -515,6 +515,56 @@ public void 
testStreamShardMetadataSerializedUsingPojoSerializer() {
assertTrue(typeInformation.createSerializer(new 
ExecutionConfig()) instanceof PojoSerializer);
}
 
+   /**
+* FLINK-8484: ensure that a state change in the StreamShardMetadata 
other than {@link StreamShardMetadata#shardId} or
+* {@link StreamShardMetadata#streamName} does not result in the shard 
not being able to be restored.
+* This handles the corner case where the stored shard metadata is open 
(no ending sequence number), but after the
+* job restore, the shard has been closed (ending number set) due to 
re-sharding, and we can no longer rely on
+* {@link StreamShardMetadata#equals(Object)} to find back the sequence 
number in the collection of restored shard metadata.
+*/
+   @Test
+   public void testFindSequenceNumberToRestoreFrom() {
--- End diff --

Makes sense. I'll look into it and see if I can find a way to test it as a 
whole.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Philip Luppens
>Assignee: Philip Luppens
>Priority: Blocker
>  Labels: bug, flink, kinesis
> Fix For: 1.3.3, 1.5.0, 1.4.1
>
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread Tzu-Li (Gordon) Tai (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335528#comment-16335528
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-8484:


I think we should make this a blocker for 1.4.1 & 1.5.0, also.
Updating.

> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Philip Luppens
>Priority: Major
>  Labels: bug, flink, kinesis
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335523#comment-16335523
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5337#discussion_r163175172
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -210,16 +210,18 @@ public void run(SourceContext sourceContext) 
throws Exception {
for (StreamShardHandle shard : allShards) {
StreamShardMetadata kinesisStreamShard = 
KinesisDataFetcher.convertToStreamShardMetadata(shard);
if (sequenceNumsToRestore != null) {
-   if 
(sequenceNumsToRestore.containsKey(kinesisStreamShard)) {
+   // find the sequence number for the given 
converted kinesis shard in our restored state
--- End diff --

nit: Capital 'K' for Kinesis


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Philip Luppens
>Priority: Major
>  Labels: bug, flink, kinesis
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335525#comment-16335525
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5337#discussion_r163175349
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java
 ---
@@ -267,6 +269,27 @@ public void run(SourceContext sourceContext) throws 
Exception {
sourceContext.close();
}
 
+   /**
+* Tries to find the {@link SequenceNumber} for a given {@code 
kinesisStreamShard} in the list of the restored stream shards' metadata,
+* by comparing the stream name and shard id for equality.
+* @param current   the current Kinesis 
shard we're trying to find the restored sequence number for
+* @param sequenceNumsToRestore the restored sequence numbers
+* @return the sequence number, if any, or {@code null} if no matching 
shard is found
+*/
+   @VisibleForTesting
+   SequenceNumber findSequenceNumberToRestoreFrom(StreamShardMetadata 
current, HashMap sequenceNumsToRestore) {
+   checkNotNull(current.getStreamName(), "Stream name not set on 
the current metadata shard");
+   checkNotNull(current.getShardId(), "Shard id not set on the 
current metadata shard");
+
+   for (final Map.Entry entry 
: sequenceNumsToRestore.entrySet()) {
--- End diff --

The `final` is redundant here.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Philip Luppens
>Priority: Major
>  Labels: bug, flink, kinesis
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335524#comment-16335524
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/5337#discussion_r163176506
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
@@ -515,6 +515,56 @@ public void 
testStreamShardMetadataSerializedUsingPojoSerializer() {
assertTrue(typeInformation.createSerializer(new 
ExecutionConfig()) instanceof PojoSerializer);
}
 
+   /**
+* FLINK-8484: ensure that a state change in the StreamShardMetadata 
other than {@link StreamShardMetadata#shardId} or
+* {@link StreamShardMetadata#streamName} does not result in the shard 
not being able to be restored.
+* This handles the corner case where the stored shard metadata is open 
(no ending sequence number), but after the
+* job restore, the shard has been closed (ending number set) due to 
re-sharding, and we can no longer rely on
+* {@link StreamShardMetadata#equals(Object)} to find back the sequence 
number in the collection of restored shard metadata.
+*/
+   @Test
+   public void testFindSequenceNumberToRestoreFrom() {
--- End diff --

Nice that a unit test is added for the method!

However, I think it would be even better if we also include a test which 
uses the `AbstractStreamOperatorTestHarness` to test this restore. That utility 
class allows taking a snapshot of the wrapped operator's state, and restoring 
it again with the snapshot.


> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Philip Luppens
>Priority: Major
>  Labels: bug, flink, kinesis
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
> work correctly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart

2018-01-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335479#comment-16335479
 ] 

ASF GitHub Bot commented on FLINK-8484:
---

GitHub user pluppens opened a pull request:

https://github.com/apache/flink/pull/5337

[FLINK-8484][flink-kinesis-connector] Ensure a Kinesis consumer snapshot 
restoration is able to handle recently closed shards

FLINK-8484: ensure that a state change in the StreamShardMetadata other 
than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not 
result in the shard not being able to be restored. This handles the corner case 
where a shard might have been closed (ending sequence number set to not-null) 
since the last savepoint or checkpoint when a job is restarted from a snapshot 
state.

## Brief change log
 - Created a new method to perform the sequence number lookup
 - Ensure that a lookup for a given existing Kinesis shard does not rely on 
equals(), but rather checks for equality on the stream name and shard id only


## Verifying this change

This change added tests and can be verified as follows:
 - A new unit test was added in `FlinkKinesisConsumerTest` called 
`testFindSequenceNumberToRestoreFrom()` which tests the lookup mechanism

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: yes
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pluppens/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5337.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5337


commit 5c756390002a2e1c00c7368bea3e1135b7722a20
Author: Philip Luppens 
Date:   2018-01-23T08:00:23Z

FLINK-8484: ensure that a state change in the StreamShardMetadata other 
than `StreamShardMetadata.shardId` or `StreamShardMetadata.streamName` does not 
result in the shard not being able to be restored. This handles the corner case 
where a shard might have been closed (ending sequence number set to not-null) 
since the last savepoint or checkpoint when a job is restarted from a snapshot 
state.




> Kinesis consumer re-reads closed shards on job restart
> --
>
> Key: FLINK-8484
> URL: https://issues.apache.org/jira/browse/FLINK-8484
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.2
>Reporter: Philip Luppens
>Priority: Major
>  Labels: bug, flink, kinesis
>
> We’re using the connector to subscribe to streams varying from 1 to a 100 
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis 
> stream up and down during peak times. What we’ve noticed is that, while we 
> were having closed shards, any Flink job restart with check- or save-point 
> would result in shards being re-read from the event horizon, duplicating our 
> events.
>  
> We started checking the checkpoint state, and found that the shards were 
> stored correctly with the proper sequence number (including for closed 
> shards), but that upon restarts, the older closed shards would be read from 
> the event horizon, as if their restored state would be ignored.
>  
> In the end, we believe that we found the problem: in the 
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard returned 
> from the KinesisDataFetcher against the shards’ metadata from the restoration 
> point, but we do this via a containsKey() call, which means we’ll use the 
> StreamShardMetadata’s equals() method. However, this checks for all 
> properties, including the endingSequenceNumber, which might have changed 
> between the restored state’s checkpoint and our data fetch, thus failing the 
> equality check, failing the containsKey() check, and resulting in the shard 
> being re-read from the event horizon, even though it was present in the 
> restored state.
>  
> We’ve created a workaround where we only check for the shardId and stream 
> name to restore the state of the shards we’ve already seen, and this seems to 
>