While we're on this: 
https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html

This is a concrete way of separating partition/shard/split discovery from their 
reading. The nice thing about this is that you can mix-and-match "discovery 
components" and "reader components". For example, for Kafka we would have a 
TopicReader and I can envision different discovery implementations: one very 
simple, no-frills, but rock solid, another one that does automatic discovery of 
new partitions, regex matching, etc...
 

> On 22. Feb 2018, at 01:49, Jamie Grier <jgr...@lyft.com> wrote:
> 
> I know this is a very simplistic idea but...
> 
> In general the issue Eron is describing occurs whenever two (or more)
> parallel partitions are assigned to the same Flink sub-task and there is
> large time delta between them.  This problem exists though largely because
> we are not making any decisions about which of these partitions to read and
> when but rather just treating them all the same.  However, this isn't the
> only way to approach the problem.
> 
> Think instead of each partition as a "roughly time sorted" file and the
> function of the connector as roughly a merge sort type process.  In other
> words just read the older data first by peeking at each partition and
> deciding what to read next.  The output of the connector would be a roughly
> time ordered stream that way..
> 
> However to really solve the whole problem you'd have to carry this idea
> throughout Flink and be more selective about which data you read and when
> throughout the whole data flow graph.  Similar problem I think and just
> something I've been thinking a bit about lately.
> 
> 
> 
> 
> On Mon, Feb 12, 2018 at 7:12 PM, Eron Wright <eronwri...@gmail.com> wrote:
> 
>> It is valuable to consider the behavior of a consumer in both a real-time
>> processing context, which consists mostly of tail reads, and a historical
>> processing context, where there's an abundance of backlogged data.   In the
>> historical processing context, system internals (e.g. shard selection
>> logic) have an outsized influence on the order of observation and
>> potentially the progression of the event time clock.  In a real-time
>> context, the order of observation is, by definition, mostly determined by
>> the order in which events are produced.
>> 
>> My point is, it would be good to explore the efficacy of these improvements
>> in both contexts.
>> 
>> 
>> 
>> 
>> On Mon, Feb 12, 2018 at 5:10 PM, Thomas Weise <t...@apache.org> wrote:
>> 
>>> I don't think there is a generic solution to the problem you are
>>> describing; we don't know how long it will take for resharding to take
>>> effect and those changes to become visible to the connector. Depending on
>>> how latency sensitive the pipeline is, possibly a configurable watermark
>>> hold period could be used to cushion the event time chaos introduced by
>>> resharding.
>>> 
>>> This isn't the primary motivation for the connector customization I'm
>>> working on though. We face issues with restart from older checkpoints
>> where
>>> parent and child shards are consumed in parallel.
>>> 
>>> 
>>> --
>>> sent from mobile
>>> 
>>> 
>>> On Feb 12, 2018 4:36 PM, "Eron Wright" <eronwri...@gmail.com> wrote:
>>> 
>>> I'd like to know how you envision dealing with resharding in relation to
>>> the watermark state.   Imagine that a given shard S1 has a watermark of
>> T1,
>>> and is then split into two shards S2 and S3.   The new shards are
>> assigned
>>> to subtasks according to a hash function.  The current watermarks of
>> those
>>> subtasks could be far ahead of T1, and thus the events in S2/S3 will be
>>> considered late.
>>> 
>>> The problem of a chaotic event time clock is exacerbated by any source
>> that
>>> uses dynamic partitioning.  Would a per-shard watermark generator really
>>> solve the problem that is motivating you?
>>> 
>>> Thanks,
>>> Eron
>>> 
>>> On Mon, Feb 12, 2018 at 10:35 AM, Thomas Weise <t...@apache.org> wrote:
>>> 
>>>> Based on my draft implementation, the changes that are needed in the
>>> Flink
>>>> connector are as follows:
>>>> 
>>>> I need to be able to override the following to track last record
>>> timestamp
>>>> and idle time per shard.
>>>> 
>>>>    protected final void emitRecordAndUpdateState(T record, long
>>>> recordTimestamp, int shardStateIndex, SequenceNumber
>> lastSequenceNumber)
>>> {
>>>>        synchronized (checkpointLock) {
>>>>            sourceContext.collectWithTimestamp(record,
>> recordTimestamp);
>>>>            updateState(shardStateIndex, lastSequenceNumber);
>>>>        }
>>>>    }
>>>> 
>>>> Any objection removing final from it?
>>>> 
>>>> Also, why is sourceContext.collectWithTimestamp in the synchronized
>>> block?
>>>> My custom class will need to emit watermarks - I assume there is no
>> need
>>> to
>>>> acquire checkpointLock for that? Otherwise I would also need to add
>>>> emitWatermark() to the base class.
>>>> 
>>>> Let me know if anything else should be considered, I will open a JIRA
>> and
>>>> PR otherwise.
>>>> 
>>>> Thanks,
>>>> Thomas
>>>> 
>>>> 
>>>> On Thu, Feb 8, 2018 at 3:03 PM, Thomas Weise <t...@apache.org> wrote:
>>>> 
>>>>> -->
>>>>> 
>>>>> On Thu, Feb 8, 2018 at 2:16 AM, Tzu-Li (Gordon) Tai <
>>> tzuli...@apache.org
>>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Regarding the two hooks you would like to be available:
>>>>>> 
>>>>>> 
>>>>>>   - Provide hook to override discovery (not to hit Kinesis from
>> every
>>>>>>   subtask)
>>>>>> 
>>>>>> Yes, I think we can easily provide a way, for example setting -1 for
>>>>>> SHARD_DISCOVERY_INTERVAL_MILLIS, to disable shard discovery.
>>>>>> Though, the user would then have to savepoint and restore in order
>> to
>>>>>> pick up new shards after a Kinesis stream reshard (which is in
>>> practice
>>>> the
>>>>>> best way to by-pass the Kinesis API rate limitations).
>>>>>> +1 to provide that.
>>>>>> 
>>>>> 
>>>>> I'm considering a customization of KinesisDataFetcher with override
>> for
>>>>> discoverNewShardsToSubscribe. We still want shards to be discovered,
>>> just
>>>>> not by hitting Kinesis from every subtask.
>>>>> 
>>>>> 
>>>>>> 
>>>>>> 
>>>>>>   - Provide hook to support custom watermark generation (somewhere
>>>>>>   around KinesisDataFetcher.emitRecordAndUpdateState)
>>>>>> 
>>>>>> Per-partition watermark generation on the Kinesis side is slightly
>>> more
>>>>>> complex than Kafka, due to how Kinesis’s dynamic resharding works.
>>>>>> I think we need to additionally allow new shards to be consumed only
>>>>>> after its parent shard is fully read, otherwise “per-shard time
>>>>>> characteristics” can be broken because of this out-of-orderness
>>>> consumption
>>>>>> across the boundaries of a closed parent shard and its child.
>>>>>> There theses JIRAs [1][2] which has a bit more details on the topic.
>>>>>> Otherwise, in general I’m also +1 to providing this also in the
>>> Kinesis
>>>>>> consumer.
>>>>>> 
>>>>> 
>>>>> Here I'm thinking to customize emitRecordAndUpdateState (method would
>>>> need
>>>>> to be made non-final). Using getSubscribedShardsState with additional
>>>>> transient state to keep track of watermark per shard and emit
>> watermark
>>>> as
>>>>> appropriate.
>>>>> 
>>>>> That's the idea - haven't written any code for it yet.
>>>>> 
>>>>> Thanks,
>>>>> Thomas
>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to