I don't think there is a generic solution to the problem you are
describing; we don't know how long it will take for resharding to take
effect and those changes to become visible to the connector. Depending on
how latency sensitive the pipeline is, possibly a configurable watermark
hold period could be used to cushion the event time chaos introduced by
resharding.

This isn't the primary motivation for the connector customization I'm
working on though. We face issues with restart from older checkpoints where
parent and child shards are consumed in parallel.


--
sent from mobile


On Feb 12, 2018 4:36 PM, "Eron Wright" <eronwri...@gmail.com> wrote:

I'd like to know how you envision dealing with resharding in relation to
the watermark state.   Imagine that a given shard S1 has a watermark of T1,
and is then split into two shards S2 and S3.   The new shards are assigned
to subtasks according to a hash function.  The current watermarks of those
subtasks could be far ahead of T1, and thus the events in S2/S3 will be
considered late.

The problem of a chaotic event time clock is exacerbated by any source that
uses dynamic partitioning.  Would a per-shard watermark generator really
solve the problem that is motivating you?

Thanks,
Eron

On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise <t...@apache.org> wrote:

> Based on my draft implementation, the changes that are needed in the Flink
> connector are as follows:
>
> I need to be able to override the following to track last record timestamp
> and idle time per shard.
>
>     protected final void emitRecordAndUpdateState(T record, long
> recordTimestamp, int shardStateIndex, SequenceNumber lastSequenceNumber) {
>         synchronized (checkpointLock) {
>             sourceContext.collectWithTimestamp(record, recordTimestamp);
>             updateState(shardStateIndex, lastSequenceNumber);
>         }
>     }
>
> Any objection removing final from it?
>
> Also, why is sourceContext.collectWithTimestamp in the synchronized block?
> My custom class will need to emit watermarks - I assume there is no need
to
> acquire checkpointLock for that? Otherwise I would also need to add
> emitWatermark() to the base class.
>
> Let me know if anything else should be considered, I will open a JIRA and
> PR otherwise.
>
> Thanks,
> Thomas
>
>
> On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote:
>
> > -->
> >
> > On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org
> >
> > wrote:
> >
> >> Regarding the two hooks you would like to be available:
> >>
> >>
> >>    - Provide hook to override discovery (not to hit Kinesis from every
> >>    subtask)
> >>
> >> Yes, I think we can easily provide a way, for example setting -1 for
> >> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
> >> Though, the user would then have to savepoint and restore in order to
> >> pick up new shards after a Kinesis stream reshard (which is in practice
> the
> >> best way to by-pass the Kinesis API rate limitations).
> >> +1 to provide that.
> >>
> >
> > I'm considering a customization of KinesisDataFetcher with override for
> > discoverNewShardsToSubscribe. We still want shards to be discovered,
just
> > not by hitting Kinesis from every subtask.
> >
> >
> >>
> >>
> >>    - Provide hook to support custom watermark generation (somewhere
> >>    around KinesisDataFetcher.emitRecordAndUpdateState)
> >>
> >> Per-partition watermark generation on the Kinesis side is slightly more
> >> complex than Kafka, due to how Kinesis’s dynamic resharding works.
> >> I think we need to additionally allow new shards to be consumed only
> >> after its parent shard is fully read, otherwise “per-shard time
> >> characteristics” can be broken because of this out-of-orderness
> consumption
> >> across the boundaries of a closed parent shard and its child.
> >> There theses JIRAs [1][2] which has a bit more details on the topic.
> >> Otherwise, in general I’m also +1 to providing this also in the Kinesis
> >> consumer.
> >>
> >
> > Here I'm thinking to customize emitRecordAndUpdateState (method would
> need
> > to be made non-final). Using getSubscribedShardsState with additional
> > transient state to keep track of watermark per shard and emit watermark
> as
> > appropriate.
> >
> > That's the idea - haven't written any code for it yet.
> >
> > Thanks,
> > Thomas
> >
> >
>

Reply via email to