Hi Stephan,

Thank you for feedback!
Will take a look at your branch before public discussing.


On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> wrote:

> Hi Biao!
>
> Thanks for reviving this. I would like to join this discussion, but am
> quite occupied with the 1.9 release, so can we maybe pause this discussion
> for a week or so?
>
> In the meantime I can share some suggestion based on prior experiments:
>
> How to do watermarks / timestamp extractors in a simpler and more flexible
> way. I think that part is quite promising should be part of the new source
> interface.
>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
>
>
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
>
>
>
> Some experiments on how to build the source reader and its library for
> common threading/split patterns:
>
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
>
>
> Best,
> Stephan
>
>
> On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> wrote:
>
>> Hi devs,
>>
>> Since 1.9 is nearly released, I think we could get back to FLIP-27. I
>> believe it should be included in 1.10.
>>
>> There are so many things mentioned in document of FLIP-27. [1] I think
>> we'd better discuss them separately. However the wiki is not a good place
>> to discuss. I wrote google doc about SplitReader API which misses some
>> details in the document. [2]
>>
>> 1.
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
>> 2.
>> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
>>
>> CC Stephan, Aljoscha, Piotrek, Becket
>>
>>
>> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> wrote:
>>
>>> Hi Steven,
>>> Thank you for the feedback. Please take a look at the document FLIP-27
>>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface>
>>>  which
>>> is updated recently. A lot of details of enumerator were added in this
>>> document. I think it would help.
>>>
>>> Steven Wu <stevenz...@gmail.com> 于2019年3月28日周四 下午12:52写道:
>>>
>>>> This proposal mentioned that SplitEnumerator might run on the
>>>> JobManager or
>>>> in a single task on a TaskManager.
>>>>
>>>> if enumerator is a single task on a taskmanager, then the job DAG can
>>>> never
>>>> been embarrassingly parallel anymore. That will nullify the leverage of
>>>> fine-grained recovery for embarrassingly parallel jobs.
>>>>
>>>> It's not clear to me what's the implication of running enumerator on the
>>>> jobmanager. So I will leave that out for now.
>>>>
>>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> wrote:
>>>>
>>>> > Hi Stephan & Piotrek,
>>>> >
>>>> > Thank you for feedback.
>>>> >
>>>> > It seems that there are a lot of things to do in community. I am just
>>>> > afraid that this discussion may be forgotten since there so many
>>>> proposals
>>>> > recently.
>>>> > Anyway, wish to see the split topics soon :)
>>>> >
>>>> > Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四 下午8:21写道:
>>>> >
>>>> > > Hi Biao!
>>>> > >
>>>> > > This discussion was stalled because of preparations for the open
>>>> sourcing
>>>> > > & merging Blink. I think before creating the tickets we should
>>>> split this
>>>> > > discussion into topics/areas outlined by Stephan and create Flips
>>>> for
>>>> > that.
>>>> > >
>>>> > > I think there is no chance for this to be completed in couple of
>>>> > remaining
>>>> > > weeks/1 month before 1.8 feature freeze, however it would be good
>>>> to aim
>>>> > > with those changes for 1.9.
>>>> > >
>>>> > > Piotrek
>>>> > >
>>>> > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> wrote:
>>>> > > >
>>>> > > > Hi community,
>>>> > > > The summary of Stephan makes a lot sense to me. It is much clearer
>>>> > indeed
>>>> > > > after splitting the complex topic into small ones.
>>>> > > > I was wondering is there any detail plan for next step? If not, I
>>>> would
>>>> > > > like to push this thing forward by creating some JIRA issues.
>>>> > > > Another question is that should version 1.8 include these
>>>> features?
>>>> > > >
>>>> > > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道:
>>>> > > >
>>>> > > >> Thanks everyone for the lively discussion. Let me try to
>>>> summarize
>>>> > > where I
>>>> > > >> see convergence in the discussion and open issues.
>>>> > > >> I'll try to group this by design aspect of the source. Please
>>>> let me
>>>> > > know
>>>> > > >> if I got things wrong or missed something crucial here.
>>>> > > >>
>>>> > > >> For issues 1-3, if the below reflects the state of the
>>>> discussion, I
>>>> > > would
>>>> > > >> try and update the FLIP in the next days.
>>>> > > >> For the remaining ones we need more discussion.
>>>> > > >>
>>>> > > >> I would suggest to fork each of these aspects into a separate
>>>> mail
>>>> > > thread,
>>>> > > >> or will loose sight of the individual aspects.
>>>> > > >>
>>>> > > >> *(1) Separation of Split Enumerator and Split Reader*
>>>> > > >>
>>>> > > >>  - All seem to agree this is a good thing
>>>> > > >>  - Split Enumerator could in the end live on JobManager (and
>>>> assign
>>>> > > splits
>>>> > > >> via RPC) or in a task (and assign splits via data streams)
>>>> > > >>  - this discussion is orthogonal and should come later, when the
>>>> > > interface
>>>> > > >> is agreed upon.
>>>> > > >>
>>>> > > >> *(2) Split Readers for one or more splits*
>>>> > > >>
>>>> > > >>  - Discussion seems to agree that we need to support one reader
>>>> that
>>>> > > >> possibly handles multiple splits concurrently.
>>>> > > >>  - The requirement comes from sources where one poll()-style call
>>>> > > fetches
>>>> > > >> data from different splits / partitions
>>>> > > >>    --> example sources that require that would be for example
>>>> Kafka,
>>>> > > >> Pravega, Pulsar
>>>> > > >>
>>>> > > >>  - Could have one split reader per source, or multiple split
>>>> readers
>>>> > > that
>>>> > > >> share the "poll()" function
>>>> > > >>  - To not make it too complicated, we can start with thinking
>>>> about
>>>> > one
>>>> > > >> split reader for all splits initially and see if that covers all
>>>> > > >> requirements
>>>> > > >>
>>>> > > >> *(3) Threading model of the Split Reader*
>>>> > > >>
>>>> > > >>  - Most active part of the discussion ;-)
>>>> > > >>
>>>> > > >>  - A non-blocking way for Flink's task code to interact with the
>>>> > source
>>>> > > is
>>>> > > >> needed in order to a task runtime code based on a
>>>> > > >> single-threaded/actor-style task design
>>>> > > >>    --> I personally am a big proponent of that, it will help with
>>>> > > >> well-behaved checkpoints, efficiency, and simpler yet more robust
>>>> > > runtime
>>>> > > >> code
>>>> > > >>
>>>> > > >>  - Users care about simple abstraction, so as a subclass of
>>>> > SplitReader
>>>> > > >> (non-blocking / async) we need to have a BlockingSplitReader
>>>> which
>>>> > will
>>>> > > >> form the basis of most source implementations.
>>>> BlockingSplitReader
>>>> > lets
>>>> > > >> users do blocking simple poll() calls.
>>>> > > >>  - The BlockingSplitReader would spawn a thread (or more) and the
>>>> > > >> thread(s) can make blocking calls and hand over data buffers via
>>>> a
>>>> > > blocking
>>>> > > >> queue
>>>> > > >>  - This should allow us to cover both, a fully async runtime,
>>>> and a
>>>> > > simple
>>>> > > >> blocking interface for users.
>>>> > > >>  - This is actually very similar to how the Kafka connectors
>>>> work.
>>>> > Kafka
>>>> > > >> 9+ with one thread, Kafka 8 with multiple threads
>>>> > > >>
>>>> > > >>  - On the base SplitReader (the async one), the non-blocking
>>>> method
>>>> > that
>>>> > > >> gets the next chunk of data would signal data availability via a
>>>> > > >> CompletableFuture, because that gives the best flexibility (can
>>>> await
>>>> > > >> completion or register notification handlers).
>>>> > > >>  - The source task would register a "thenHandle()" (or similar)
>>>> on the
>>>> > > >> future to put a "take next data" task into the actor-style
>>>> mailbox
>>>> > > >>
>>>> > > >> *(4) Split Enumeration and Assignment*
>>>> > > >>
>>>> > > >>  - Splits may be generated lazily, both in cases where there is a
>>>> > > limited
>>>> > > >> number of splits (but very many), or splits are discovered over
>>>> time
>>>> > > >>  - Assignment should also be lazy, to get better load balancing
>>>> > > >>  - Assignment needs support locality preferences
>>>> > > >>
>>>> > > >>  - Possible design based on discussion so far:
>>>> > > >>
>>>> > > >>    --> SplitReader has a method "addSplits(SplitT...)" to add
>>>> one or
>>>> > > more
>>>> > > >> splits. Some split readers might assume they have only one split
>>>> ever,
>>>> > > >> concurrently, others assume multiple splits. (Note: idea behind
>>>> being
>>>> > > able
>>>> > > >> to add multiple splits at the same time is to ease startup where
>>>> > > multiple
>>>> > > >> splits may be assigned instantly.)
>>>> > > >>    --> SplitReader has a context object on which it can call
>>>> indicate
>>>> > > when
>>>> > > >> splits are completed. The enumerator gets that notification and
>>>> can
>>>> > use
>>>> > > to
>>>> > > >> decide when to assign new splits. This should help both in cases
>>>> of
>>>> > > sources
>>>> > > >> that take splits lazily (file readers) and in case the source
>>>> needs to
>>>> > > >> preserve a partial order between splits (Kinesis, Pravega,
>>>> Pulsar may
>>>> > > need
>>>> > > >> that).
>>>> > > >>    --> SplitEnumerator gets notification when SplitReaders start
>>>> and
>>>> > > when
>>>> > > >> they finish splits. They can decide at that moment to push more
>>>> splits
>>>> > > to
>>>> > > >> that reader
>>>> > > >>    --> The SplitEnumerator should probably be aware of the source
>>>> > > >> parallelism, to build its initial distribution.
>>>> > > >>
>>>> > > >>  - Open question: Should the source expose something like "host
>>>> > > >> preferences", so that yarn/mesos/k8s can take this into account
>>>> when
>>>> > > >> selecting a node to start a TM on?
>>>> > > >>
>>>> > > >> *(5) Watermarks and event time alignment*
>>>> > > >>
>>>> > > >>  - Watermark generation, as well as idleness, needs to be per
>>>> split
>>>> > > (like
>>>> > > >> currently in the Kafka Source, per partition)
>>>> > > >>  - It is desirable to support optional event-time-alignment,
>>>> meaning
>>>> > > that
>>>> > > >> splits that are ahead are back-pressured or temporarily
>>>> unsubscribed
>>>> > > >>
>>>> > > >>  - I think i would be desirable to encapsulate watermark
>>>> generation
>>>> > > logic
>>>> > > >> in watermark generators, for a separation of concerns. The
>>>> watermark
>>>> > > >> generators should run per split.
>>>> > > >>  - Using watermark generators would also help with another
>>>> problem of
>>>> > > the
>>>> > > >> suggested interface, namely supporting non-periodic watermarks
>>>> > > efficiently.
>>>> > > >>
>>>> > > >>  - Need a way to "dispatch" next record to different watermark
>>>> > > generators
>>>> > > >>  - Need a way to tell SplitReader to "suspend" a split until a
>>>> certain
>>>> > > >> watermark is reached (event time backpressure)
>>>> > > >>  - This would in fact be not needed (and thus simpler) if we had
>>>> a
>>>> > > >> SplitReader per split and may be a reason to re-open that
>>>> discussion
>>>> > > >>
>>>> > > >> *(6) Watermarks across splits and in the Split Enumerator*
>>>> > > >>
>>>> > > >>  - The split enumerator may need some watermark awareness, which
>>>> > should
>>>> > > be
>>>> > > >> purely based on split metadata (like create timestamp of file
>>>> splits)
>>>> > > >>  - If there are still more splits with overlapping event time
>>>> range
>>>> > for
>>>> > > a
>>>> > > >> split reader, then that split reader should not advance the
>>>> watermark
>>>> > > >> within the split beyond the overlap boundary. Otherwise future
>>>> splits
>>>> > > will
>>>> > > >> produce late data.
>>>> > > >>
>>>> > > >>  - One way to approach this could be that the split enumerator
>>>> may
>>>> > send
>>>> > > >> watermarks to the readers, and the readers cannot emit watermarks
>>>> > beyond
>>>> > > >> that received watermark.
>>>> > > >>  - Many split enumerators would simply immediately send Long.MAX
>>>> out
>>>> > and
>>>> > > >> leave the progress purely to the split readers.
>>>> > > >>
>>>> > > >>  - For event-time alignment / split back pressure, this begs the
>>>> > > question
>>>> > > >> how we can avoid deadlocks that may arise when splits are
>>>> suspended
>>>> > for
>>>> > > >> event time back pressure,
>>>> > > >>
>>>> > > >> *(7) Batch and streaming Unification*
>>>> > > >>
>>>> > > >>  - Functionality wise, the above design should support both
>>>> > > >>  - Batch often (mostly) does not care about reading "in order"
>>>> and
>>>> > > >> generating watermarks
>>>> > > >>    --> Might use different enumerator logic that is more locality
>>>> > aware
>>>> > > >> and ignores event time order
>>>> > > >>    --> Does not generate watermarks
>>>> > > >>  - Would be great if bounded sources could be identified at
>>>> compile
>>>> > > time,
>>>> > > >> so that "env.addBoundedSource(...)" is type safe and can return a
>>>> > > >> "BoundedDataStream".
>>>> > > >>  - Possible to defer this discussion until later
>>>> > > >>
>>>> > > >> *Miscellaneous Comments*
>>>> > > >>
>>>> > > >>  - Should the source have a TypeInformation for the produced
>>>> type,
>>>> > > instead
>>>> > > >> of a serializer? We need a type information in the stream
>>>> anyways, and
>>>> > > can
>>>> > > >> derive the serializer from that. Plus, creating the serializer
>>>> should
>>>> > > >> respect the ExecutionConfig.
>>>> > > >>
>>>> > > >>  - The TypeSerializer interface is very powerful but also not
>>>> easy to
>>>> > > >> implement. Its purpose is to handle data super efficiently,
>>>> support
>>>> > > >> flexible ways of evolution, etc.
>>>> > > >>  For metadata I would suggest to look at the
>>>> SimpleVersionedSerializer
>>>> > > >> instead, which is used for example for checkpoint master hooks,
>>>> or for
>>>> > > the
>>>> > > >> streaming file sink. I think that is is a good match for cases
>>>> where
>>>> > we
>>>> > > do
>>>> > > >> not need more than ser/deser (no copy, etc.) and don't need to
>>>> push
>>>> > > >> versioning out of the serialization paths for best performance
>>>> (as in
>>>> > > the
>>>> > > >> TypeSerializer)
>>>> > > >>
>>>> > > >>
>>>> > > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
>>>> > > >> k.klou...@data-artisans.com>
>>>> > > >> wrote:
>>>> > > >>
>>>> > > >>> Hi Biao,
>>>> > > >>>
>>>> > > >>> Thanks for the answer!
>>>> > > >>>
>>>> > > >>> So given the multi-threaded readers, now we have as open
>>>> questions:
>>>> > > >>>
>>>> > > >>> 1) How do we let the checkpoints pass through our multi-threaded
>>>> > reader
>>>> > > >>> operator?
>>>> > > >>>
>>>> > > >>> 2) Do we have separate reader and source operators or not? In
>>>> the
>>>> > > >> strategy
>>>> > > >>> that has a separate source, the source operator has a
>>>> parallelism of
>>>> > 1
>>>> > > >> and
>>>> > > >>> is responsible for split recovery only.
>>>> > > >>>
>>>> > > >>> For the first one, given also the constraints (blocking, finite
>>>> > queues,
>>>> > > >>> etc), I do not have an answer yet.
>>>> > > >>>
>>>> > > >>> For the 2nd, I think that we should go with separate operators
>>>> for
>>>> > the
>>>> > > >>> source and the readers, for the following reasons:
>>>> > > >>>
>>>> > > >>> 1) This is more aligned with a potential future improvement
>>>> where the
>>>> > > >> split
>>>> > > >>> discovery becomes a responsibility of the JobManager and
>>>> readers are
>>>> > > >>> pooling more work from the JM.
>>>> > > >>>
>>>> > > >>> 2) The source is going to be the "single point of truth". It
>>>> will
>>>> > know
>>>> > > >> what
>>>> > > >>> has been processed and what not. If the source and the readers
>>>> are a
>>>> > > >> single
>>>> > > >>> operator with parallelism > 1, or in general, if the split
>>>> discovery
>>>> > is
>>>> > > >>> done by each task individually, then:
>>>> > > >>>   i) we have to have a deterministic scheme for each reader to
>>>> assign
>>>> > > >>> splits to itself (e.g. mod subtaskId). This is not necessarily
>>>> > trivial
>>>> > > >> for
>>>> > > >>> all sources.
>>>> > > >>>   ii) each reader would have to keep a copy of all its processed
>>>> > slpits
>>>> > > >>>   iii) the state has to be a union state with a non-trivial
>>>> merging
>>>> > > >> logic
>>>> > > >>> in order to support rescaling.
>>>> > > >>>
>>>> > > >>> Two additional points that you raised above:
>>>> > > >>>
>>>> > > >>> i) The point that you raised that we need to keep all splits
>>>> > (processed
>>>> > > >> and
>>>> > > >>> not-processed) I think is a bit of a strong requirement. This
>>>> would
>>>> > > imply
>>>> > > >>> that for infinite sources the state will grow indefinitely.
>>>> This is
>>>> > > >> problem
>>>> > > >>> is even more pronounced if we do not have a single source that
>>>> > assigns
>>>> > > >>> splits to readers, as each reader will have its own copy of the
>>>> > state.
>>>> > > >>>
>>>> > > >>> ii) it is true that for finite sources we need to somehow not
>>>> close
>>>> > the
>>>> > > >>> readers when the source/split discoverer finishes. The
>>>> > > >>> ContinuousFileReaderOperator has a work-around for that. It is
>>>> not
>>>> > > >> elegant,
>>>> > > >>> and checkpoints are not emitted after closing the source, but
>>>> this, I
>>>> > > >>> believe, is a bigger problem which requires more changes than
>>>> just
>>>> > > >>> refactoring the source interface.
>>>> > > >>>
>>>> > > >>> Cheers,
>>>> > > >>> Kostas
>>>> > > >>>
>>>> > > >>
>>>> > >
>>>> > >
>>>> >
>>>>
>>>

Reply via email to