Hi Gordon,

Yeah, I’d need to confirm with our devops guys that this is the case (by
default, the Kinesis monitoring doesn’t show how many/which shards were
re-ingested, all I remember is seeing the iterator age shooting up again to
the retention horizon, but no clue if this was because of 1 shard, or
more). I do remember we were having issues regardless when there were
closed shards, but I could be wrong.

[1] https://issues.apache.org/jira/browse/FLINK-8484

I’ve created a ticket [1] to track the issue, and I’ll see if I can provide
a small patch against the 1.3 branch.

HTH,

-Phil

On Mon, Jan 22, 2018 at 6:26 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Philip,
>
> Thanks a lot for reporting this, and looking into this in detail.
>
> Your observation sounds accurate to me. The `endingSequenceNumber` would
> no longer be null once a shard is closed, so on restore that would mistaken
> the consumer to think that it’s a new shard and start consuming it from the
> earliest sequence number possible (i.e., treating it as if it is a new
> shard that was created while the job wasn’t running).
>
> I think we haven’t seen other reports on this, yet, because the issue you
> observed seems to only happen in a corner case where you rescaled the
> Kinesis stream while the job was down.
> Could you confirm that assumption? My guess is probably Flink users who
> uses Kinesis have currently only been rescaling Kinesis streams while the
> job was running.
>
> Your workaround is also a valid fix for this bug. Could you file a JIRA
> for this? Would be happy to also review a PR for the fix, if you would like
> to contribute it.
>
> Cheers,
> Gordon
>
>
> On 22 January 2018 at 5:08:36 PM, Philip Luppens (philip.lupp...@gmail.com)
> wrote:
>
> Hi everyone,
>
> For the past weeks, we’ve been struggling with Kinesis ingestion using the
> Flink Kinesis connector, but the seemingly complete lack of similar reports
> makes us wonder if perhaps we misconfigured or mis-used the connector.
>
> We’re using the connector to subscribe to streams varying from 1 to a 100
> shards, and used the kinesis-scaling-utils to dynamically scale the Kinesis
> stream up and down during peak times. What we’ve noticed is that, while we
> were having closed shards, any Flink job restart with check- or save-point
> would result in shards being re-read from the event horizon, duplicating
> our events.
>
> We started checking the checkpoint state, and found that the shards were
> stored correctly with the proper sequence number (including for closed
> shards), but that upon restarts, the older closed shards would be read from
> the event horizon, as if their restored state would be ignored.
>
> In the end, we believe that we found the problem: in the
> FlinkKinesisConsumer’s run() method, we’re trying to find the shard
> returned from the KinesisDataFetcher against the shards’ metadata from the
> restoration point, but we do this via a containsKey() call, which means
> we’ll use the StreamShardMetadata’s equals() method. However, this checks
> for all properties, including the endingSequenceNumber, which might have
> changed between the restored state’s checkpoint and our data fetch, thus
> failing the equality check, failing the containsKey() check, and resulting
> in the shard being re-read from the event horizon, even though it was
> present in the restored state.
>
> We’ve created a workaround where we only check for the shardId and stream
> name to restore the state of the shards we’ve already seen, and this seems
> to work correctly. However, as pointed out above, the lack of similar
> reports makes us worried that we’ve misunderstood something, so we’d
> appreciate any feedback whether or not our report makes sense before we
> file a bug in the issue tracker.
>
> Much appreciated,
>
> -Phil
>
> --
> "We cannot change the cards we are dealt, just how we play the hand." -
> Randy Pausch
>
>


-- 
"We cannot change the cards we are dealt, just how we play the hand." -
Randy Pausch

Reply via email to