Hi Gordon, Yeah, I’d need to confirm with our devops guys that this is the case (by default, the Kinesis monitoring doesn’t show how many/which shards were re-ingested, all I remember is seeing the iterator age shooting up again to the retention horizon, but no clue if this was because of 1 shard, or more). I do remember we were having issues regardless when there were closed shards, but I could be wrong.
[1] https://issues.apache.org/jira/browse/FLINK-8484 I’ve created a ticket [1] to track the issue, and I’ll see if I can provide a small patch against the 1.3 branch. HTH, -Phil On Mon, Jan 22, 2018 at 6:26 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Philip, > > Thanks a lot for reporting this, and looking into this in detail. > > Your observation sounds accurate to me. The `endingSequenceNumber` would > no longer be null once a shard is closed, so on restore that would mistaken > the consumer to think that it’s a new shard and start consuming it from the > earliest sequence number possible (i.e., treating it as if it is a new > shard that was created while the job wasn’t running). > > I think we haven’t seen other reports on this, yet, because the issue you > observed seems to only happen in a corner case where you rescaled the > Kinesis stream while the job was down. > Could you confirm that assumption? My guess is probably Flink users who > uses Kinesis have currently only been rescaling Kinesis streams while the > job was running. > > Your workaround is also a valid fix for this bug. Could you file a JIRA > for this? Would be happy to also review a PR for the fix, if you would like > to contribute it. > > Cheers, > Gordon > > > On 22 January 2018 at 5:08:36 PM, Philip Luppens (philip.lupp...@gmail.com) > wrote: > > Hi everyone, > > For the past weeks, we’ve been struggling with Kinesis ingestion using the > Flink Kinesis connector, but the seemingly complete lack of similar reports > makes us wonder if perhaps we misconfigured or mis-used the connector. > > We’re using the connector to subscribe to streams varying from 1 to a 100 > shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis > stream up and down during peak times. What we’ve noticed is that, while we > were having closed shards, any Flink job restart with check- or save-point > would result in shards being re-read from the event horizon, duplicating > our events. > > We started checking the checkpoint state, and found that the shards were > stored correctly with the proper sequence number (including for closed > shards), but that upon restarts, the older closed shards would be read from > the event horizon, as if their restored state would be ignored. > > In the end, we believe that we found the problem: in the > FlinkKinesisConsumer’s run() method, we’re trying to find the shard > returned from the KinesisDataFetcher against the shards’ metadata from the > restoration point, but we do this via a containsKey() call, which means > we’ll use the StreamShardMetadata’s equals() method. However, this checks > for all properties, including the endingSequenceNumber, which might have > changed between the restored state’s checkpoint and our data fetch, thus > failing the equality check, failing the containsKey() check, and resulting > in the shard being re-read from the event horizon, even though it was > present in the restored state. > > We’ve created a workaround where we only check for the shardId and stream > name to restore the state of the shards we’ve already seen, and this seems > to work correctly. However, as pointed out above, the lack of similar > reports makes us worried that we’ve misunderstood something, so we’d > appreciate any feedback whether or not our report makes sense before we > file a bug in the issue tracker. > > Much appreciated, > > -Phil > > -- > "We cannot change the cards we are dealt, just how we play the hand." - > Randy Pausch > > -- "We cannot change the cards we are dealt, just how we play the hand." - Randy Pausch