Re: Timestamp/watermark support in Kinesis consumer

2018-02-23 Thread Thomas Weise
Another nice thing is that readers can potentially also read from different
sources (historic/latest). To arrive at a general connector pattern, it
will also be necessary to consider the ordering relationship between
restrictions/splits/blocks/segments when it is important for the processing
logic - which is what Jamie refers to. For Kinesis, the most obvious case
is reading parent before child shards, but also throttling unrelated shards
if they are unbalanced WRT event time.

We are now implementing a stop gap watermark solution in custom code
because the Kinesis consumer really needs a revamp as part of general
connector overhaul.


On Thu, Feb 22, 2018 at 2:28 AM, Aljoscha Krettek 
wrote:

> While we're on this: https://beam.apache.org/blog/
> 2017/08/16/splittable-do-fn.html
>
> This is a concrete way of separating partition/shard/split discovery from
> their reading. The nice thing about this is that you can mix-and-match
> "discovery components" and "reader components". For example, for Kafka we
> would have a TopicReader and I can envision different discovery
> implementations: one very simple, no-frills, but rock solid, another one
> that does automatic discovery of new partitions, regex matching, etc...
>
>
> > On 22. Feb 2018, at 01:49, Jamie Grier  wrote:
> >
> > I know this is a very simplistic idea but...
> >
> > In general the issue Eron is describing occurs whenever two (or more)
> > parallel partitions are assigned to the same Flink sub-task and there is
> > large time delta between them.  This problem exists though largely
> because
> > we are not making any decisions about which of these partitions to read
> and
> > when but rather just treating them all the same.  However, this isn't the
> > only way to approach the problem.
> >
> > Think instead of each partition as a "roughly time sorted" file and the
> > function of the connector as roughly a merge sort type process.  In other
> > words just read the older data first by peeking at each partition and
> > deciding what to read next.  The output of the connector would be a
> roughly
> > time ordered stream that way..
> >
> > However to really solve the whole problem you'd have to carry this idea
> > throughout Flink and be more selective about which data you read and when
> > throughout the whole data flow graph.  Similar problem I think and just
> > something I've been thinking a bit about lately.
> >
> >
> >
> >
> > On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright 
> wrote:
> >
> >> It is valuable to consider the behavior of a consumer in both a
> real-time
> >> processing context, which consists mostly of tail reads, and a
> historical
> >> processing context, where there's an abundance of backlogged data.   In
> the
> >> historical processing context, system internals (e.g. shard selection
> >> logic) have an outsized influence on the order of observation and
> >> potentially the progression of the event time clock.  In a real-time
> >> context, the order of observation is, by definition, mostly determined
> by
> >> the order in which events are produced.
> >>
> >> My point is, it would be good to explore the efficacy of these
> improvements
> >> in both contexts.
> >>
> >>
> >>
> >>
> >> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise  wrote:
> >>
> >>> I don't think there is a generic solution to the problem you are
> >>> describing; we don't know how long it will take for resharding to take
> >>> effect and those changes to become visible to the connector. Depending
> on
> >>> how latency sensitive the pipeline is, possibly a configurable
> watermark
> >>> hold period could be used to cushion the event time chaos introduced by
> >>> resharding.
> >>>
> >>> This isn't the primary motivation for the connector customization I'm
> >>> working on though. We face issues with restart from older checkpoints
> >> where
> >>> parent and child shards are consumed in parallel.
> >>>
> >>>
> >>> --
> >>> sent from mobile
> >>>
> >>>
> >>> On Feb 12, 2018 4:36 PM, "Eron Wright"  wrote:
> >>>
> >>> I'd like to know how you envision dealing with resharding in relation
> to
> >>> the watermark state.   Imagine that a given shard S1 has a watermark of
> >> T1,
> >>> and is then split into two shards S2 and S3.   The new shards are
> >> assigned
> >>> to subtasks according to a hash function.  The current watermarks of
> >> those
> >>> subtasks could be far ahead of T1, and thus the events in S2/S3 will be
> >>> considered late.
> >>>
> >>> The problem of a chaotic event time clock is exacerbated by any source
> >> that
> >>> uses dynamic partitioning.  Would a per-shard watermark generator
> really
> >>> solve the problem that is motivating you?
> >>>
> >>> Thanks,
> >>> Eron
> >>>
> >>> On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise  wrote:
> >>>
>  Based on my draft implementation, the changes that are needed in the
> >>> Flink
>  connector are as follows:
> 
>  I need to be able to override the following to track last record
> >>> tim

Re: Timestamp/watermark support in Kinesis consumer

2018-02-22 Thread Aljoscha Krettek
While we're on this: 
https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html

This is a concrete way of separating partition/shard/split discovery from their 
reading. The nice thing about this is that you can mix-and-match "discovery 
components" and "reader components". For example, for Kafka we would have a 
TopicReader and I can envision different discovery implementations: one very 
simple, no-frills, but rock solid, another one that does automatic discovery of 
new partitions, regex matching, etc...
 

> On 22. Feb 2018, at 01:49, Jamie Grier  wrote:
> 
> I know this is a very simplistic idea but...
> 
> In general the issue Eron is describing occurs whenever two (or more)
> parallel partitions are assigned to the same Flink sub-task and there is
> large time delta between them.  This problem exists though largely because
> we are not making any decisions about which of these partitions to read and
> when but rather just treating them all the same.  However, this isn't the
> only way to approach the problem.
> 
> Think instead of each partition as a "roughly time sorted" file and the
> function of the connector as roughly a merge sort type process.  In other
> words just read the older data first by peeking at each partition and
> deciding what to read next.  The output of the connector would be a roughly
> time ordered stream that way..
> 
> However to really solve the whole problem you'd have to carry this idea
> throughout Flink and be more selective about which data you read and when
> throughout the whole data flow graph.  Similar problem I think and just
> something I've been thinking a bit about lately.
> 
> 
> 
> 
> On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright  wrote:
> 
>> It is valuable to consider the behavior of a consumer in both a real-time
>> processing context, which consists mostly of tail reads, and a historical
>> processing context, where there's an abundance of backlogged data.   In the
>> historical processing context, system internals (e.g. shard selection
>> logic) have an outsized influence on the order of observation and
>> potentially the progression of the event time clock.  In a real-time
>> context, the order of observation is, by definition, mostly determined by
>> the order in which events are produced.
>> 
>> My point is, it would be good to explore the efficacy of these improvements
>> in both contexts.
>> 
>> 
>> 
>> 
>> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise  wrote:
>> 
>>> I don't think there is a generic solution to the problem you are
>>> describing; we don't know how long it will take for resharding to take
>>> effect and those changes to become visible to the connector. Depending on
>>> how latency sensitive the pipeline is, possibly a configurable watermark
>>> hold period could be used to cushion the event time chaos introduced by
>>> resharding.
>>> 
>>> This isn't the primary motivation for the connector customization I'm
>>> working on though. We face issues with restart from older checkpoints
>> where
>>> parent and child shards are consumed in parallel.
>>> 
>>> 
>>> --
>>> sent from mobile
>>> 
>>> 
>>> On Feb 12, 2018 4:36 PM, "Eron Wright"  wrote:
>>> 
>>> I'd like to know how you envision dealing with resharding in relation to
>>> the watermark state.   Imagine that a given shard S1 has a watermark of
>> T1,
>>> and is then split into two shards S2 and S3.   The new shards are
>> assigned
>>> to subtasks according to a hash function.  The current watermarks of
>> those
>>> subtasks could be far ahead of T1, and thus the events in S2/S3 will be
>>> considered late.
>>> 
>>> The problem of a chaotic event time clock is exacerbated by any source
>> that
>>> uses dynamic partitioning.  Would a per-shard watermark generator really
>>> solve the problem that is motivating you?
>>> 
>>> Thanks,
>>> Eron
>>> 
>>> On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise  wrote:
>>> 
 Based on my draft implementation, the changes that are needed in the
>>> Flink
 connector are as follows:
 
 I need to be able to override the following to track last record
>>> timestamp
 and idle time per shard.
 
protected final void emitRecordAndUpdateState(T record, long
 recordTimestamp, int shardStateIndex, SequenceNumber
>> lastSequenceNumber)
>>> {
synchronized (checkpointLock) {
sourceContext.collectWithTimestamp(record,
>> recordTimestamp);
updateState(shardStateIndex, lastSequenceNumber);
}
}
 
 Any objection removing final from it?
 
 Also, why is sourceContext.collectWithTimestamp in the synchronized
>>> block?
 My custom class will need to emit watermarks - I assume there is no
>> need
>>> to
 acquire checkpointLock for that? Otherwise I would also need to add
 emitWatermark() to the base class.
 
 Let me know if anything else should be considered, I will open a JIRA
>> and
 PR otherwise.
 
 Thanks,
 Thomas
>>>

Re: Timestamp/watermark support in Kinesis consumer

2018-02-21 Thread Jamie Grier
I know this is a very simplistic idea but...

In general the issue Eron is describing occurs whenever two (or more)
parallel partitions are assigned to the same Flink sub-task and there is
large time delta between them.  This problem exists though largely because
we are not making any decisions about which of these partitions to read and
when but rather just treating them all the same.  However, this isn't the
only way to approach the problem.

Think instead of each partition as a "roughly time sorted" file and the
function of the connector as roughly a merge sort type process.  In other
words just read the older data first by peeking at each partition and
deciding what to read next.  The output of the connector would be a roughly
time ordered stream that way..

However to really solve the whole problem you'd have to carry this idea
throughout Flink and be more selective about which data you read and when
throughout the whole data flow graph.  Similar problem I think and just
something I've been thinking a bit about lately.




On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright  wrote:

> It is valuable to consider the behavior of a consumer in both a real-time
> processing context, which consists mostly of tail reads, and a historical
> processing context, where there's an abundance of backlogged data.   In the
> historical processing context, system internals (e.g. shard selection
> logic) have an outsized influence on the order of observation and
> potentially the progression of the event time clock.  In a real-time
> context, the order of observation is, by definition, mostly determined by
> the order in which events are produced.
>
> My point is, it would be good to explore the efficacy of these improvements
> in both contexts.
>
>
>
>
> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise  wrote:
>
> > I don't think there is a generic solution to the problem you are
> > describing; we don't know how long it will take for resharding to take
> > effect and those changes to become visible to the connector. Depending on
> > how latency sensitive the pipeline is, possibly a configurable watermark
> > hold period could be used to cushion the event time chaos introduced by
> > resharding.
> >
> > This isn't the primary motivation for the connector customization I'm
> > working on though. We face issues with restart from older checkpoints
> where
> > parent and child shards are consumed in parallel.
> >
> >
> > --
> > sent from mobile
> >
> >
> > On Feb 12, 2018 4:36 PM, "Eron Wright"  wrote:
> >
> > I'd like to know how you envision dealing with resharding in relation to
> > the watermark state.   Imagine that a given shard S1 has a watermark of
> T1,
> > and is then split into two shards S2 and S3.   The new shards are
> assigned
> > to subtasks according to a hash function.  The current watermarks of
> those
> > subtasks could be far ahead of T1, and thus the events in S2/S3 will be
> > considered late.
> >
> > The problem of a chaotic event time clock is exacerbated by any source
> that
> > uses dynamic partitioning.  Would a per-shard watermark generator really
> > solve the problem that is motivating you?
> >
> > Thanks,
> > Eron
> >
> > On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise  wrote:
> >
> > > Based on my draft implementation, the changes that are needed in the
> > Flink
> > > connector are as follows:
> > >
> > > I need to be able to override the following to track last record
> > timestamp
> > > and idle time per shard.
> > >
> > > protected final void emitRecordAndUpdateState(T record, long
> > > recordTimestamp, int shardStateIndex, SequenceNumber
> lastSequenceNumber)
> > {
> > > synchronized (checkpointLock) {
> > > sourceContext.collectWithTimestamp(record,
> recordTimestamp);
> > > updateState(shardStateIndex, lastSequenceNumber);
> > > }
> > > }
> > >
> > > Any objection removing final from it?
> > >
> > > Also, why is sourceContext.collectWithTimestamp in the synchronized
> > block?
> > > My custom class will need to emit watermarks - I assume there is no
> need
> > to
> > > acquire checkpointLock for that? Otherwise I would also need to add
> > > emitWatermark() to the base class.
> > >
> > > Let me know if anything else should be considered, I will open a JIRA
> and
> > > PR otherwise.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise  wrote:
> > >
> > > > -->
> > > >
> > > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <
> > tzuli...@apache.org
> > > >
> > > > wrote:
> > > >
> > > >> Regarding the two hooks you would like to be available:
> > > >>
> > > >>
> > > >>- Provide hook to override discovery (not to hit Kinesis from
> every
> > > >>subtask)
> > > >>
> > > >> Yes, I think we can easily provide a way, for example setting -1 for
> > > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> > > >> Though, the user would then have to savepoint and restore in order
> to
> > 

Re: Timestamp/watermark support in Kinesis consumer

2018-02-21 Thread Jamie Grier
Big +1 on trying to come up with a common framework for partition-based,
replayable sources.  There is so much common code to be written that makes
it possible to write correct connectors and Gordon's bullet points are
exactly those -- and it's not just Kinesis and Kafka.  It's also true for
reading data out of something like S3.  If your data is organized as a
bunch of parallel, roughly time ordered files, there really isn't much
difference in the kind of code you have to write for this for all the hard
bits mentioned above.

The good news is that the potential outcome of this sort of effort could be
that production quality, correct, parallel connectors are much easier to
implement.  Ideally everything other than the code you write to discover
partitions and the code to consume data from a single simple partition
could be mostly common.




On Thu, Feb 8, 2018 at 2:01 AM, Tzu-Li (Gordon) Tai 
wrote:

> Hi Thomas,
>
> It’s great that you’ve brought out these issues, which IMO are all very
> valid. They have also been in my head for a while.
>
> Here’s a list of things, out of the top of my head, that I would really
> like to improve as part of a major Kafka / Kinesis connector rework.
> Some have JIRAs for them already, or were discussed in some other
> indirectly related JIRA. It might make sense to open an umbrella ticket and
> consolidate all of them there.
>
> - Common abstraction for partition-based, replayable sources, which
> handles 1) position checkpointing, 2) partition discovery / topic
> subscription (using the file source pattern), 3) custom startup positions,
> 4) per-partition watermarks, and 5) partition idleness.
> - Configuration for the connectors are not well-defined. Some go through
> provided properties, some requires using setter methods, etc. Moreover, it
> is actually very confusing for some users that we share the Properties to
> carry Flink connector-specific configurations, as well as the internally
> used client configuration [1]. I think in this aspect, Beam’s KafkaIO has a
> nice API [2] when it comes to this.
> - Some default behaviors of the current connectors, such as partitioning
> and flushing on the producer sides, and offset-committing for the Kafka
> consumer, do not make sense [3] [4].
> - The deserialization / serialization schema together with the partitioner
> interfaces don’t really place well together. For example, the
> `getTargetTopic` method should really be part of the partitioner [5].
>
> I think we are now in a good position to try making this happen for 1.6.
> Once 1.5 is out of the way, I can try opening an umbrella JIRA and collect
> everything there so we can discuss more there.
>
> Cheers,
> Gordon
>
> [1] https://issues.apache.org/jira/browse/FLINK-4280?
> focusedCommentId=15399648&page=com.atlassian.jira.
> plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15399648
> [2] https://github.com/apache/beam/blob/master/sdks/java/io/
> kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L171
> [3] https://issues.apache.org/jira/browse/FLINK-5728
> [4] https://issues.apache.org/jira/browse/FLINK-5704
> [5] https://issues.apache.org/jira/browse/FLINK-6288
>
> On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote:
>
> Generalizing the pattern would be great. I was also wondering if there
> aren't other commonalities between sources that would benefit from a shared
> framework. Kafka and Kinesis don't look all that different from a consumer
> perspective: replayable source, topic -> stream, partition -> shard, offset
> -> sequence, dynamic discovery, state saving - shouldn't there be more
> common code?
>
> Meanwhile, we need to find a way to address shortcomings in the current
> Kinesis connector to enable the use case. I would prefer to do that without
> permanently forking the connector code, so here are some more thoughts:
> Provide hook to override discovery (not to hit Kinesis from every subtask)
> Provide hook to support custom watermark generation (somewhere around
> KinesisDataFetcher.emitRecordAndUpdateState)
> If we can accomplish these in short order, it would be great. The current
> implementation makes it hard/impossible to override certain behaviors
> (final protected methods and the like). If there is agreement then I would
> like to address those as a quick PR.
>
> Thanks,
> Thomas
>
>
> On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek 
> wrote:
> Hi,
>
> That last point is very valid. For a while now I've wanted to generalise
> the pattern of our file source to other sources. (This is related to how
> Beam sources are being refactored to use Splittable DoFn.)
>
> I'm very eager for design work to start on this once 1.5 is out the door.
> There are some other folks (cc'ed) who have also talked/thought about this
> before.
>
> Best,
> Aljoscha
>
> > On 7. Feb 2018, at 01:44, Thomas Weise  wrote:
> >
> > In addition to lack of watermark support, the Kinesis consumer suffers
> from
> > a discovery related issue th

Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Eron Wright
It is valuable to consider the behavior of a consumer in both a real-time
processing context, which consists mostly of tail reads, and a historical
processing context, where there's an abundance of backlogged data.   In the
historical processing context, system internals (e.g. shard selection
logic) have an outsized influence on the order of observation and
potentially the progression of the event time clock.  In a real-time
context, the order of observation is, by definition, mostly determined by
the order in which events are produced.

My point is, it would be good to explore the efficacy of these improvements
in both contexts.




On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise  wrote:

> I don't think there is a generic solution to the problem you are
> describing; we don't know how long it will take for resharding to take
> effect and those changes to become visible to the connector. Depending on
> how latency sensitive the pipeline is, possibly a configurable watermark
> hold period could be used to cushion the event time chaos introduced by
> resharding.
>
> This isn't the primary motivation for the connector customization I'm
> working on though. We face issues with restart from older checkpoints where
> parent and child shards are consumed in parallel.
>
>
> --
> sent from mobile
>
>
> On Feb 12, 2018 4:36 PM, "Eron Wright"  wrote:
>
> I'd like to know how you envision dealing with resharding in relation to
> the watermark state.   Imagine that a given shard S1 has a watermark of T1,
> and is then split into two shards S2 and S3.   The new shards are assigned
> to subtasks according to a hash function.  The current watermarks of those
> subtasks could be far ahead of T1, and thus the events in S2/S3 will be
> considered late.
>
> The problem of a chaotic event time clock is exacerbated by any source that
> uses dynamic partitioning.  Would a per-shard watermark generator really
> solve the problem that is motivating you?
>
> Thanks,
> Eron
>
> On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise  wrote:
>
> > Based on my draft implementation, the changes that are needed in the
> Flink
> > connector are as follows:
> >
> > I need to be able to override the following to track last record
> timestamp
> > and idle time per shard.
> >
> > protected final void emitRecordAndUpdateState(T record, long
> > recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber)
> {
> > synchronized (checkpointLock) {
> > sourceContext.collectWithTimestamp(record, recordTimestamp);
> > updateState(shardStateIndex, lastSequenceNumber);
> > }
> > }
> >
> > Any objection removing final from it?
> >
> > Also, why is sourceContext.collectWithTimestamp in the synchronized
> block?
> > My custom class will need to emit watermarks - I assume there is no need
> to
> > acquire checkpointLock for that? Otherwise I would also need to add
> > emitWatermark() to the base class.
> >
> > Let me know if anything else should be considered, I will open a JIRA and
> > PR otherwise.
> >
> > Thanks,
> > Thomas
> >
> >
> > On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise  wrote:
> >
> > > -->
> > >
> > > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <
> tzuli...@apache.org
> > >
> > > wrote:
> > >
> > >> Regarding the two hooks you would like to be available:
> > >>
> > >>
> > >>- Provide hook to override discovery (not to hit Kinesis from every
> > >>subtask)
> > >>
> > >> Yes, I think we can easily provide a way, for example setting -1 for
> > >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> > >> Though, the user would then have to savepoint and restore in order to
> > >> pick up new shards after a Kinesis stream reshard (which is in
> practice
> > the
> > >> best way to by-pass the Kinesis API rate limitations).
> > >> +1 to provide that.
> > >>
> > >
> > > I'm considering a customization of KinesisDataFetcher with override for
> > > discoverNewShardsToSubscribe. We still want shards to be discovered,
> just
> > > not by hitting Kinesis from every subtask.
> > >
> > >
> > >>
> > >>
> > >>- Provide hook to support custom watermark generation (somewhere
> > >>around KinesisDataFetcher.emitRecordAndUpdateState)
> > >>
> > >> Per-partition watermark generation on the Kinesis side is slightly
> more
> > >> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> > >> I think we need to additionally allow new shards to be consumed only
> > >> after its parent shard is fully read, otherwise “per-shard time
> > >> characteristics” can be broken because of this out-of-orderness
> > consumption
> > >> across the boundaries of a closed parent shard and its child.
> > >> There theses JIRAs [1][2] which has a bit more details on the topic.
> > >> Otherwise, in general I’m also +1 to providing this also in the
> Kinesis
> > >> consumer.
> > >>
> > >
> > > Here I'm thinking to customize emitRecordAndUpdateState (method would
> > need
> > > to be made non-f

Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Thomas Weise
I don't think there is a generic solution to the problem you are
describing; we don't know how long it will take for resharding to take
effect and those changes to become visible to the connector. Depending on
how latency sensitive the pipeline is, possibly a configurable watermark
hold period could be used to cushion the event time chaos introduced by
resharding.

This isn't the primary motivation for the connector customization I'm
working on though. We face issues with restart from older checkpoints where
parent and child shards are consumed in parallel.


--
sent from mobile


On Feb 12, 2018 4:36 PM, "Eron Wright"  wrote:

I'd like to know how you envision dealing with resharding in relation to
the watermark state.   Imagine that a given shard S1 has a watermark of T1,
and is then split into two shards S2 and S3.   The new shards are assigned
to subtasks according to a hash function.  The current watermarks of those
subtasks could be far ahead of T1, and thus the events in S2/S3 will be
considered late.

The problem of a chaotic event time clock is exacerbated by any source that
uses dynamic partitioning.  Would a per-shard watermark generator really
solve the problem that is motivating you?

Thanks,
Eron

On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise  wrote:

> Based on my draft implementation, the changes that are needed in the Flink
> connector are as follows:
>
> I need to be able to override the following to track last record timestamp
> and idle time per shard.
>
> protected final void emitRecordAndUpdateState(T record, long
> recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
> synchronized (checkpointLock) {
> sourceContext.collectWithTimestamp(record, recordTimestamp);
> updateState(shardStateIndex, lastSequenceNumber);
> }
> }
>
> Any objection removing final from it?
>
> Also, why is sourceContext.collectWithTimestamp in the synchronized block?
> My custom class will need to emit watermarks - I assume there is no need
to
> acquire checkpointLock for that? Otherwise I would also need to add
> emitWatermark() to the base class.
>
> Let me know if anything else should be considered, I will open a JIRA and
> PR otherwise.
>
> Thanks,
> Thomas
>
>
> On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise  wrote:
>
> > -->
> >
> > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai  >
> > wrote:
> >
> >> Regarding the two hooks you would like to be available:
> >>
> >>
> >>- Provide hook to override discovery (not to hit Kinesis from every
> >>subtask)
> >>
> >> Yes, I think we can easily provide a way, for example setting -1 for
> >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> >> Though, the user would then have to savepoint and restore in order to
> >> pick up new shards after a Kinesis stream reshard (which is in practice
> the
> >> best way to by-pass the Kinesis API rate limitations).
> >> +1 to provide that.
> >>
> >
> > I'm considering a customization of KinesisDataFetcher with override for
> > discoverNewShardsToSubscribe. We still want shards to be discovered,
just
> > not by hitting Kinesis from every subtask.
> >
> >
> >>
> >>
> >>- Provide hook to support custom watermark generation (somewhere
> >>around KinesisDataFetcher.emitRecordAndUpdateState)
> >>
> >> Per-partition watermark generation on the Kinesis side is slightly more
> >> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> >> I think we need to additionally allow new shards to be consumed only
> >> after its parent shard is fully read, otherwise “per-shard time
> >> characteristics” can be broken because of this out-of-orderness
> consumption
> >> across the boundaries of a closed parent shard and its child.
> >> There theses JIRAs [1][2] which has a bit more details on the topic.
> >> Otherwise, in general I’m also +1 to providing this also in the Kinesis
> >> consumer.
> >>
> >
> > Here I'm thinking to customize emitRecordAndUpdateState (method would
> need
> > to be made non-final). Using getSubscribedShardsState with additional
> > transient state to keep track of watermark per shard and emit watermark
> as
> > appropriate.
> >
> > That's the idea - haven't written any code for it yet.
> >
> > Thanks,
> > Thomas
> >
> >
>


Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Eron Wright
I'd like to know how you envision dealing with resharding in relation to
the watermark state.   Imagine that a given shard S1 has a watermark of T1,
and is then split into two shards S2 and S3.   The new shards are assigned
to subtasks according to a hash function.  The current watermarks of those
subtasks could be far ahead of T1, and thus the events in S2/S3 will be
considered late.

The problem of a chaotic event time clock is exacerbated by any source that
uses dynamic partitioning.  Would a per-shard watermark generator really
solve the problem that is motivating you?

Thanks,
Eron

On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise  wrote:

> Based on my draft implementation, the changes that are needed in the Flink
> connector are as follows:
>
> I need to be able to override the following to track last record timestamp
> and idle time per shard.
>
> protected final void emitRecordAndUpdateState(T record, long
> recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
> synchronized (checkpointLock) {
> sourceContext.collectWithTimestamp(record, recordTimestamp);
> updateState(shardStateIndex, lastSequenceNumber);
> }
> }
>
> Any objection removing final from it?
>
> Also, why is sourceContext.collectWithTimestamp in the synchronized block?
> My custom class will need to emit watermarks - I assume there is no need to
> acquire checkpointLock for that? Otherwise I would also need to add
> emitWatermark() to the base class.
>
> Let me know if anything else should be considered, I will open a JIRA and
> PR otherwise.
>
> Thanks,
> Thomas
>
>
> On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise  wrote:
>
> > -->
> >
> > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai  >
> > wrote:
> >
> >> Regarding the two hooks you would like to be available:
> >>
> >>
> >>- Provide hook to override discovery (not to hit Kinesis from every
> >>subtask)
> >>
> >> Yes, I think we can easily provide a way, for example setting -1 for
> >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> >> Though, the user would then have to savepoint and restore in order to
> >> pick up new shards after a Kinesis stream reshard (which is in practice
> the
> >> best way to by-pass the Kinesis API rate limitations).
> >> +1 to provide that.
> >>
> >
> > I'm considering a customization of KinesisDataFetcher with override for
> > discoverNewShardsToSubscribe. We still want shards to be discovered, just
> > not by hitting Kinesis from every subtask.
> >
> >
> >>
> >>
> >>- Provide hook to support custom watermark generation (somewhere
> >>around KinesisDataFetcher.emitRecordAndUpdateState)
> >>
> >> Per-partition watermark generation on the Kinesis side is slightly more
> >> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> >> I think we need to additionally allow new shards to be consumed only
> >> after its parent shard is fully read, otherwise “per-shard time
> >> characteristics” can be broken because of this out-of-orderness
> consumption
> >> across the boundaries of a closed parent shard and its child.
> >> There theses JIRAs [1][2] which has a bit more details on the topic.
> >> Otherwise, in general I’m also +1 to providing this also in the Kinesis
> >> consumer.
> >>
> >
> > Here I'm thinking to customize emitRecordAndUpdateState (method would
> need
> > to be made non-final). Using getSubscribedShardsState with additional
> > transient state to keep track of watermark per shard and emit watermark
> as
> > appropriate.
> >
> > That's the idea - haven't written any code for it yet.
> >
> > Thanks,
> > Thomas
> >
> >
>


Re: Timestamp/watermark support in Kinesis consumer

2018-02-12 Thread Thomas Weise
Based on my draft implementation, the changes that are needed in the Flink
connector are as follows:

I need to be able to override the following to track last record timestamp
and idle time per shard.

protected final void emitRecordAndUpdateState(T record, long
recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
synchronized (checkpointLock) {
sourceContext.collectWithTimestamp(record, recordTimestamp);
updateState(shardStateIndex, lastSequenceNumber);
}
}

Any objection removing final from it?

Also, why is sourceContext.collectWithTimestamp in the synchronized block?
My custom class will need to emit watermarks - I assume there is no need to
acquire checkpointLock for that? Otherwise I would also need to add
emitWatermark() to the base class.

Let me know if anything else should be considered, I will open a JIRA and
PR otherwise.

Thanks,
Thomas


On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise  wrote:

> -->
>
> On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai 
> wrote:
>
>> Regarding the two hooks you would like to be available:
>>
>>
>>- Provide hook to override discovery (not to hit Kinesis from every
>>subtask)
>>
>> Yes, I think we can easily provide a way, for example setting -1 for
>> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
>> Though, the user would then have to savepoint and restore in order to
>> pick up new shards after a Kinesis stream reshard (which is in practice the
>> best way to by-pass the Kinesis API rate limitations).
>> +1 to provide that.
>>
>
> I'm considering a customization of KinesisDataFetcher with override for
> discoverNewShardsToSubscribe. We still want shards to be discovered, just
> not by hitting Kinesis from every subtask.
>
>
>>
>>
>>- Provide hook to support custom watermark generation (somewhere
>>around KinesisDataFetcher.emitRecordAndUpdateState)
>>
>> Per-partition watermark generation on the Kinesis side is slightly more
>> complex than Kafka, due to how Kinesis’s dynamic resharding works.
>> I think we need to additionally allow new shards to be consumed only
>> after its parent shard is fully read, otherwise “per-shard time
>> characteristics” can be broken because of this out-of-orderness consumption
>> across the boundaries of a closed parent shard and its child.
>> There theses JIRAs [1][2] which has a bit more details on the topic.
>> Otherwise, in general I’m also +1 to providing this also in the Kinesis
>> consumer.
>>
>
> Here I'm thinking to customize emitRecordAndUpdateState (method would need
> to be made non-final). Using getSubscribedShardsState with additional
> transient state to keep track of watermark per shard and emit watermark as
> appropriate.
>
> That's the idea - haven't written any code for it yet.
>
> Thanks,
> Thomas
>
>


Re: Timestamp/watermark support in Kinesis consumer

2018-02-08 Thread Thomas Weise
-->

On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai 
wrote:

> Regarding the two hooks you would like to be available:
>
>
>- Provide hook to override discovery (not to hit Kinesis from every
>subtask)
>
> Yes, I think we can easily provide a way, for example setting -1 for
> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> Though, the user would then have to savepoint and restore in order to pick
> up new shards after a Kinesis stream reshard (which is in practice the best
> way to by-pass the Kinesis API rate limitations).
> +1 to provide that.
>

I'm considering a customization of KinesisDataFetcher with override for
discoverNewShardsToSubscribe. We still want shards to be discovered, just
not by hitting Kinesis from every subtask.


>
>
>- Provide hook to support custom watermark generation (somewhere
>around KinesisDataFetcher.emitRecordAndUpdateState)
>
> Per-partition watermark generation on the Kinesis side is slightly more
> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> I think we need to additionally allow new shards to be consumed only after
> its parent shard is fully read, otherwise “per-shard time characteristics”
> can be broken because of this out-of-orderness consumption across the
> boundaries of a closed parent shard and its child.
> There theses JIRAs [1][2] which has a bit more details on the topic.
> Otherwise, in general I’m also +1 to providing this also in the Kinesis
> consumer.
>

Here I'm thinking to customize emitRecordAndUpdateState (method would need
to be made non-final). Using getSubscribedShardsState with additional
transient state to keep track of watermark per shard and emit watermark as
appropriate.

That's the idea - haven't written any code for it yet.

Thanks,
Thomas


Re: Timestamp/watermark support in Kinesis consumer

2018-02-08 Thread Tzu-Li (Gordon) Tai
Regarding the two hooks you would like to be available:

Provide hook to override discovery (not to hit Kinesis from every subtask)
Yes, I think we can easily provide a way, for example setting -1 for 
SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
Though, the user would then have to savepoint and restore in order to pick up 
new shards after a Kinesis stream reshard (which is in practice the best way to 
by-pass the Kinesis API rate limitations).
+1 to provide that.

Provide hook to support custom watermark generation (somewhere around 
KinesisDataFetcher.emitRecordAndUpdateState)
Per-partition watermark generation on the Kinesis side is slightly more complex 
than Kafka, due to how Kinesis’s dynamic resharding works.
I think we need to additionally allow new shards to be consumed only after its 
parent shard is fully read, otherwise “per-shard time characteristics” can be 
broken because of this out-of-orderness consumption across the boundaries of a 
closed parent shard and its child.
There theses JIRAs [1][2] which has a bit more details on the topic.
Otherwise, in general I’m also +1 to providing this also in the Kinesis 
consumer.

Cheers,
Gordon

[1] https://issues.apache.org/jira/browse/FLINK-5697
[2] https://issues.apache.org/jira/browse/FLINK-6349

On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote:

Provide hook to override discovery (not to hit Kinesis from every subtask)
Provide hook to support custom watermark generation (somewhere around 
KinesisDataFetcher.emitRecordAndUpdateState)

Re: Timestamp/watermark support in Kinesis consumer

2018-02-08 Thread Tzu-Li (Gordon) Tai
Hi Thomas,

It’s great that you’ve brought out these issues, which IMO are all very valid. 
They have also been in my head for a while.

Here’s a list of things, out of the top of my head, that I would really like to 
improve as part of a major Kafka / Kinesis connector rework.
Some have JIRAs for them already, or were discussed in some other indirectly 
related JIRA. It might make sense to open an umbrella ticket and consolidate 
all of them there.

- Common abstraction for partition-based, replayable sources, which handles 1) 
position checkpointing, 2) partition discovery / topic subscription (using the 
file source pattern), 3) custom startup positions, 4) per-partition watermarks, 
and 5) partition idleness.
- Configuration for the connectors are not well-defined. Some go through 
provided properties, some requires using setter methods, etc. Moreover, it is 
actually very confusing for some users that we share the Properties to carry 
Flink connector-specific configurations, as well as the internally used client 
configuration [1]. I think in this aspect, Beam’s KafkaIO has a nice API [2] 
when it comes to this.
- Some default behaviors of the current connectors, such as partitioning and 
flushing on the producer sides, and offset-committing for the Kafka consumer, 
do not make sense [3] [4].
- The deserialization / serialization schema together with the partitioner 
interfaces don’t really place well together. For example, the `getTargetTopic` 
method should really be part of the partitioner [5].

I think we are now in a good position to try making this happen for 1.6. Once 
1.5 is out of the way, I can try opening an umbrella JIRA and collect 
everything there so we can discuss more there.

Cheers,
Gordon

[1] 
https://issues.apache.org/jira/browse/FLINK-4280?focusedCommentId=15399648&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-15399648
[2] 
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L171
[3] https://issues.apache.org/jira/browse/FLINK-5728
[4] https://issues.apache.org/jira/browse/FLINK-5704
[5] https://issues.apache.org/jira/browse/FLINK-6288

On 8 February 2018 at 1:48:23 AM, Thomas Weise (t...@apache.org) wrote:

Generalizing the pattern would be great. I was also wondering if there aren't 
other commonalities between sources that would benefit from a shared framework. 
Kafka and Kinesis don't look all that different from a consumer perspective: 
replayable source, topic -> stream, partition -> shard, offset -> sequence, 
dynamic discovery, state saving - shouldn't there be more common code?

Meanwhile, we need to find a way to address shortcomings in the current Kinesis 
connector to enable the use case. I would prefer to do that without permanently 
forking the connector code, so here are some more thoughts:
Provide hook to override discovery (not to hit Kinesis from every subtask)
Provide hook to support custom watermark generation (somewhere around 
KinesisDataFetcher.emitRecordAndUpdateState)
If we can accomplish these in short order, it would be great. The current 
implementation makes it hard/impossible to override certain behaviors (final 
protected methods and the like). If there is agreement then I would like to 
address those as a quick PR.

Thanks,
Thomas


On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek  wrote:
Hi,

That last point is very valid. For a while now I've wanted to generalise the 
pattern of our file source to other sources. (This is related to how Beam 
sources are being refactored to use Splittable DoFn.)

I'm very eager for design work to start on this once 1.5 is out the door. There 
are some other folks (cc'ed) who have also talked/thought about this before.

Best,
Aljoscha

> On 7. Feb 2018, at 01:44, Thomas Weise  wrote:
>
> In addition to lack of watermark support, the Kinesis consumer suffers from
> a discovery related issue that also needs to be resolved. Shard discovery
> runs periodically in all subtasks. That's not just inefficient but becomes
> a real problem when there is a large number of subtasks due to rate
> limiting (
> https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
> The discovery interval should be minimized to cap latency (new shards not
> consumed until discovered).
>
> How about moving discovery out of the fetcher into a separate singleton
> source and then broadcast the result to the parallel fetchers, following
> the pattern applied to file input?
>
> https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4ef726ffe1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336
>
> This would also ensure that all subtasks consistently see the same shard
> list.
>
> Thoughts?
>
> Thanks,
> Thomas
>
>
> On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise  wrote:
>
>> Hi,
>>
>> The Kinesis consumer currently does not emit watermarks, and this can le

Re: Timestamp/watermark support in Kinesis consumer

2018-02-07 Thread Thomas Weise
Generalizing the pattern would be great. I was also wondering if there
aren't other commonalities between sources that would benefit from a shared
framework. Kafka and Kinesis don't look all that different from a consumer
perspective: replayable source, topic -> stream, partition -> shard, offset
-> sequence, dynamic discovery, state saving - shouldn't there be more
common code?

Meanwhile, we need to find a way to address shortcomings in the current
Kinesis connector to enable the use case. I would prefer to do that without
permanently forking the connector code, so here are some more thoughts:

   - Provide hook to override discovery (not to hit Kinesis from every
   subtask)
   - Provide hook to support custom watermark generation (somewhere around
   KinesisDataFetcher.emitRecordAndUpdateState)

If we can accomplish these in short order, it would be great. The current
implementation makes it hard/impossible to override certain behaviors
(final protected methods and the like). If there is agreement then I would
like to address those as a quick PR.

Thanks,
Thomas


On Wed, Feb 7, 2018 at 7:59 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> That last point is very valid. For a while now I've wanted to generalise
> the pattern of our file source to other sources. (This is related to how
> Beam sources are being refactored to use Splittable DoFn.)
>
> I'm very eager for design work to start on this once 1.5 is out the door.
> There are some other folks (cc'ed) who have also talked/thought about this
> before.
>
> Best,
> Aljoscha
>
> > On 7. Feb 2018, at 01:44, Thomas Weise  wrote:
> >
> > In addition to lack of watermark support, the Kinesis consumer suffers
> from
> > a discovery related issue that also needs to be resolved. Shard discovery
> > runs periodically in all subtasks. That's not just inefficient but
> becomes
> > a real problem when there is a large number of subtasks due to rate
> > limiting (
> > https://docs.aws.amazon.com/streams/latest/dev/service-
> sizes-and-limits.html).
> > The discovery interval should be minimized to cap latency (new shards not
> > consumed until discovered).
> >
> > How about moving discovery out of the fetcher into a separate singleton
> > source and then broadcast the result to the parallel fetchers, following
> > the pattern applied to file input?
> >
> > https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4
> ef726ffe1b/flink-streaming-java/src/main/java/org/apache/
> flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336
> >
> > This would also ensure that all subtasks consistently see the same shard
> > list.
> >
> > Thoughts?
> >
> > Thanks,
> > Thomas
> >
> >
> > On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise  wrote:
> >
> >> Hi,
> >>
> >> The Kinesis consumer currently does not emit watermarks, and this can
> lead
> >> to problems when a single subtask reads from multiple shards and offsets
> >> are not closely aligned with respect to the event time.
> >>
> >> The Kafka consumer has support for periodic and punctuated watermarks,
> >> although there is also the unresolved issue https://issues.apache.org/
> >> jira/browse/FLINK-5479 that would equally apply for Kinesis.
> >>
> >> I propose adding support for timestamp assigner and watermark generator
> to
> >> the Kinesis consumer.
> >>
> >> As for handling of idle shards, is there a preference? Perhaps a
> >> customization point on the assigner that defers the decision to the user
> >> would be appropriate?
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
>
>


Re: Timestamp/watermark support in Kinesis consumer

2018-02-07 Thread Aljoscha Krettek
Hi,

That last point is very valid. For a while now I've wanted to generalise the 
pattern of our file source to other sources. (This is related to how Beam 
sources are being refactored to use Splittable DoFn.)

I'm very eager for design work to start on this once 1.5 is out the door. There 
are some other folks (cc'ed) who have also talked/thought about this before. 

Best,
Aljoscha

> On 7. Feb 2018, at 01:44, Thomas Weise  wrote:
> 
> In addition to lack of watermark support, the Kinesis consumer suffers from
> a discovery related issue that also needs to be resolved. Shard discovery
> runs periodically in all subtasks. That's not just inefficient but becomes
> a real problem when there is a large number of subtasks due to rate
> limiting (
> https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
> The discovery interval should be minimized to cap latency (new shards not
> consumed until discovered).
> 
> How about moving discovery out of the fetcher into a separate singleton
> source and then broadcast the result to the parallel fetchers, following
> the pattern applied to file input?
> 
> https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4ef726ffe1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336
> 
> This would also ensure that all subtasks consistently see the same shard
> list.
> 
> Thoughts?
> 
> Thanks,
> Thomas
> 
> 
> On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise  wrote:
> 
>> Hi,
>> 
>> The Kinesis consumer currently does not emit watermarks, and this can lead
>> to problems when a single subtask reads from multiple shards and offsets
>> are not closely aligned with respect to the event time.
>> 
>> The Kafka consumer has support for periodic and punctuated watermarks,
>> although there is also the unresolved issue https://issues.apache.org/
>> jira/browse/FLINK-5479 that would equally apply for Kinesis.
>> 
>> I propose adding support for timestamp assigner and watermark generator to
>> the Kinesis consumer.
>> 
>> As for handling of idle shards, is there a preference? Perhaps a
>> customization point on the assigner that defers the decision to the user
>> would be appropriate?
>> 
>> Thanks,
>> Thomas
>> 
>> 



Re: Timestamp/watermark support in Kinesis consumer

2018-02-06 Thread Thomas Weise
In addition to lack of watermark support, the Kinesis consumer suffers from
a discovery related issue that also needs to be resolved. Shard discovery
runs periodically in all subtasks. That's not just inefficient but becomes
a real problem when there is a large number of subtasks due to rate
limiting (
https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
The discovery interval should be minimized to cap latency (new shards not
consumed until discovered).

How about moving discovery out of the fetcher into a separate singleton
source and then broadcast the result to the parallel fetchers, following
the pattern applied to file input?

https://github.com/apache/flink/blob/5f523e6ab31afeab5b1d9bbf62c6d4ef726ffe1b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1336

This would also ensure that all subtasks consistently see the same shard
list.

Thoughts?

Thanks,
Thomas


On Mon, Feb 5, 2018 at 5:31 PM, Thomas Weise  wrote:

> Hi,
>
> The Kinesis consumer currently does not emit watermarks, and this can lead
> to problems when a single subtask reads from multiple shards and offsets
> are not closely aligned with respect to the event time.
>
> The Kafka consumer has support for periodic and punctuated watermarks,
> although there is also the unresolved issue https://issues.apache.org/
> jira/browse/FLINK-5479 that would equally apply for Kinesis.
>
> I propose adding support for timestamp assigner and watermark generator to
> the Kinesis consumer.
>
> As for handling of idle shards, is there a preference? Perhaps a
> customization point on the assigner that defers the decision to the user
> would be appropriate?
>
> Thanks,
> Thomas
>
>


Timestamp/watermark support in Kinesis consumer

2018-02-05 Thread Thomas Weise
Hi,

The Kinesis consumer currently does not emit watermarks, and this can lead
to problems when a single subtask reads from multiple shards and offsets
are not closely aligned with respect to the event time.

The Kafka consumer has support for periodic and punctuated watermarks,
although there is also the unresolved issue
https://issues.apache.org/jira/browse/FLINK-5479 that would equally apply
for Kinesis.

I propose adding support for timestamp assigner and watermark generator to
the Kinesis consumer.

As for handling of idle shards, is there a preference? Perhaps a
customization point on the assigner that defers the decision to the user
would be appropriate?

Thanks,
Thomas