Thanks for putting together this proposal!

I see that the "Per Split Event Time" and "Event Time Alignment" sections
are still TBD.

It would probably be good to flesh those out a bit before proceeding too far
as the event time alignment will probably influence the interaction with
the split reader, specifically ReaderStatus emitNext(SourceOutput<E>
output).

We currently have only one implementation for event time alignment in the
Kinesis consumer. The synchronization in that case takes place as the last
step before records are emitted downstream (RecordEmitter). With the
currently proposed interfaces, the equivalent can be implemented in the
reader loop, although note that in the Kinesis consumer the per shard
threads push records.

Synchronization has not been implemented for the Kafka consumer yet.

https://issues.apache.org/jira/browse/FLINK-12675

When I looked at it, I realized that the implementation will look quite
different
from Kinesis because it needs to take place in the pull part, where records
are taken from the Kafka client. Due to the multiplexing it cannot be done
by blocking the split thread like it currently works for Kinesis. Reading
from individual Kafka partitions needs to be controlled via pause/resume
on the Kafka client.

To take on that responsibility the split thread would need to be aware of
the
watermarks or at least whether it should or should not continue to consume
a given split and this may require a different SourceReader or SourceOutput
interface.

Thanks,
Thomas


On Fri, Jul 26, 2019 at 1:39 AM Biao Liu <mmyy1...@gmail.com> wrote:

> Hi Stephan,
>
> Thank you for feedback!
> Will take a look at your branch before public discussing.
>
>
> On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> wrote:
>
> > Hi Biao!
> >
> > Thanks for reviving this. I would like to join this discussion, but am
> > quite occupied with the 1.9 release, so can we maybe pause this
> discussion
> > for a week or so?
> >
> > In the meantime I can share some suggestion based on prior experiments:
> >
> > How to do watermarks / timestamp extractors in a simpler and more
> flexible
> > way. I think that part is quite promising should be part of the new
> source
> > interface.
> >
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime
> >
> >
> >
> https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java
> >
> >
> >
> > Some experiments on how to build the source reader and its library for
> > common threading/split patterns:
> >
> >
> https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src
> >
> >
> > Best,
> > Stephan
> >
> >
> > On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> wrote:
> >
> >> Hi devs,
> >>
> >> Since 1.9 is nearly released, I think we could get back to FLIP-27. I
> >> believe it should be included in 1.10.
> >>
> >> There are so many things mentioned in document of FLIP-27. [1] I think
> >> we'd better discuss them separately. However the wiki is not a good
> place
> >> to discuss. I wrote google doc about SplitReader API which misses some
> >> details in the document. [2]
> >>
> >> 1.
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> >> 2.
> >>
> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing
> >>
> >> CC Stephan, Aljoscha, Piotrek, Becket
> >>
> >>
> >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> wrote:
> >>
> >>> Hi Steven,
> >>> Thank you for the feedback. Please take a look at the document FLIP-27
> >>> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface>
> which
> >>> is updated recently. A lot of details of enumerator were added in this
> >>> document. I think it would help.
> >>>
> >>> Steven Wu <stevenz...@gmail.com> 于2019年3月28日周四 下午12:52写道:
> >>>
> >>>> This proposal mentioned that SplitEnumerator might run on the
> >>>> JobManager or
> >>>> in a single task on a TaskManager.
> >>>>
> >>>> if enumerator is a single task on a taskmanager, then the job DAG can
> >>>> never
> >>>> been embarrassingly parallel anymore. That will nullify the leverage
> of
> >>>> fine-grained recovery for embarrassingly parallel jobs.
> >>>>
> >>>> It's not clear to me what's the implication of running enumerator on
> the
> >>>> jobmanager. So I will leave that out for now.
> >>>>
> >>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> wrote:
> >>>>
> >>>> > Hi Stephan & Piotrek,
> >>>> >
> >>>> > Thank you for feedback.
> >>>> >
> >>>> > It seems that there are a lot of things to do in community. I am
> just
> >>>> > afraid that this discussion may be forgotten since there so many
> >>>> proposals
> >>>> > recently.
> >>>> > Anyway, wish to see the split topics soon :)
> >>>> >
> >>>> > Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四 下午8:21写道:
> >>>> >
> >>>> > > Hi Biao!
> >>>> > >
> >>>> > > This discussion was stalled because of preparations for the open
> >>>> sourcing
> >>>> > > & merging Blink. I think before creating the tickets we should
> >>>> split this
> >>>> > > discussion into topics/areas outlined by Stephan and create Flips
> >>>> for
> >>>> > that.
> >>>> > >
> >>>> > > I think there is no chance for this to be completed in couple of
> >>>> > remaining
> >>>> > > weeks/1 month before 1.8 feature freeze, however it would be good
> >>>> to aim
> >>>> > > with those changes for 1.9.
> >>>> > >
> >>>> > > Piotrek
> >>>> > >
> >>>> > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> wrote:
> >>>> > > >
> >>>> > > > Hi community,
> >>>> > > > The summary of Stephan makes a lot sense to me. It is much
> clearer
> >>>> > indeed
> >>>> > > > after splitting the complex topic into small ones.
> >>>> > > > I was wondering is there any detail plan for next step? If not,
> I
> >>>> would
> >>>> > > > like to push this thing forward by creating some JIRA issues.
> >>>> > > > Another question is that should version 1.8 include these
> >>>> features?
> >>>> > > >
> >>>> > > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道:
> >>>> > > >
> >>>> > > >> Thanks everyone for the lively discussion. Let me try to
> >>>> summarize
> >>>> > > where I
> >>>> > > >> see convergence in the discussion and open issues.
> >>>> > > >> I'll try to group this by design aspect of the source. Please
> >>>> let me
> >>>> > > know
> >>>> > > >> if I got things wrong or missed something crucial here.
> >>>> > > >>
> >>>> > > >> For issues 1-3, if the below reflects the state of the
> >>>> discussion, I
> >>>> > > would
> >>>> > > >> try and update the FLIP in the next days.
> >>>> > > >> For the remaining ones we need more discussion.
> >>>> > > >>
> >>>> > > >> I would suggest to fork each of these aspects into a separate
> >>>> mail
> >>>> > > thread,
> >>>> > > >> or will loose sight of the individual aspects.
> >>>> > > >>
> >>>> > > >> *(1) Separation of Split Enumerator and Split Reader*
> >>>> > > >>
> >>>> > > >>  - All seem to agree this is a good thing
> >>>> > > >>  - Split Enumerator could in the end live on JobManager (and
> >>>> assign
> >>>> > > splits
> >>>> > > >> via RPC) or in a task (and assign splits via data streams)
> >>>> > > >>  - this discussion is orthogonal and should come later, when
> the
> >>>> > > interface
> >>>> > > >> is agreed upon.
> >>>> > > >>
> >>>> > > >> *(2) Split Readers for one or more splits*
> >>>> > > >>
> >>>> > > >>  - Discussion seems to agree that we need to support one reader
> >>>> that
> >>>> > > >> possibly handles multiple splits concurrently.
> >>>> > > >>  - The requirement comes from sources where one poll()-style
> call
> >>>> > > fetches
> >>>> > > >> data from different splits / partitions
> >>>> > > >>    --> example sources that require that would be for example
> >>>> Kafka,
> >>>> > > >> Pravega, Pulsar
> >>>> > > >>
> >>>> > > >>  - Could have one split reader per source, or multiple split
> >>>> readers
> >>>> > > that
> >>>> > > >> share the "poll()" function
> >>>> > > >>  - To not make it too complicated, we can start with thinking
> >>>> about
> >>>> > one
> >>>> > > >> split reader for all splits initially and see if that covers
> all
> >>>> > > >> requirements
> >>>> > > >>
> >>>> > > >> *(3) Threading model of the Split Reader*
> >>>> > > >>
> >>>> > > >>  - Most active part of the discussion ;-)
> >>>> > > >>
> >>>> > > >>  - A non-blocking way for Flink's task code to interact with
> the
> >>>> > source
> >>>> > > is
> >>>> > > >> needed in order to a task runtime code based on a
> >>>> > > >> single-threaded/actor-style task design
> >>>> > > >>    --> I personally am a big proponent of that, it will help
> with
> >>>> > > >> well-behaved checkpoints, efficiency, and simpler yet more
> robust
> >>>> > > runtime
> >>>> > > >> code
> >>>> > > >>
> >>>> > > >>  - Users care about simple abstraction, so as a subclass of
> >>>> > SplitReader
> >>>> > > >> (non-blocking / async) we need to have a BlockingSplitReader
> >>>> which
> >>>> > will
> >>>> > > >> form the basis of most source implementations.
> >>>> BlockingSplitReader
> >>>> > lets
> >>>> > > >> users do blocking simple poll() calls.
> >>>> > > >>  - The BlockingSplitReader would spawn a thread (or more) and
> the
> >>>> > > >> thread(s) can make blocking calls and hand over data buffers
> via
> >>>> a
> >>>> > > blocking
> >>>> > > >> queue
> >>>> > > >>  - This should allow us to cover both, a fully async runtime,
> >>>> and a
> >>>> > > simple
> >>>> > > >> blocking interface for users.
> >>>> > > >>  - This is actually very similar to how the Kafka connectors
> >>>> work.
> >>>> > Kafka
> >>>> > > >> 9+ with one thread, Kafka 8 with multiple threads
> >>>> > > >>
> >>>> > > >>  - On the base SplitReader (the async one), the non-blocking
> >>>> method
> >>>> > that
> >>>> > > >> gets the next chunk of data would signal data availability via
> a
> >>>> > > >> CompletableFuture, because that gives the best flexibility (can
> >>>> await
> >>>> > > >> completion or register notification handlers).
> >>>> > > >>  - The source task would register a "thenHandle()" (or similar)
> >>>> on the
> >>>> > > >> future to put a "take next data" task into the actor-style
> >>>> mailbox
> >>>> > > >>
> >>>> > > >> *(4) Split Enumeration and Assignment*
> >>>> > > >>
> >>>> > > >>  - Splits may be generated lazily, both in cases where there
> is a
> >>>> > > limited
> >>>> > > >> number of splits (but very many), or splits are discovered over
> >>>> time
> >>>> > > >>  - Assignment should also be lazy, to get better load balancing
> >>>> > > >>  - Assignment needs support locality preferences
> >>>> > > >>
> >>>> > > >>  - Possible design based on discussion so far:
> >>>> > > >>
> >>>> > > >>    --> SplitReader has a method "addSplits(SplitT...)" to add
> >>>> one or
> >>>> > > more
> >>>> > > >> splits. Some split readers might assume they have only one
> split
> >>>> ever,
> >>>> > > >> concurrently, others assume multiple splits. (Note: idea behind
> >>>> being
> >>>> > > able
> >>>> > > >> to add multiple splits at the same time is to ease startup
> where
> >>>> > > multiple
> >>>> > > >> splits may be assigned instantly.)
> >>>> > > >>    --> SplitReader has a context object on which it can call
> >>>> indicate
> >>>> > > when
> >>>> > > >> splits are completed. The enumerator gets that notification and
> >>>> can
> >>>> > use
> >>>> > > to
> >>>> > > >> decide when to assign new splits. This should help both in
> cases
> >>>> of
> >>>> > > sources
> >>>> > > >> that take splits lazily (file readers) and in case the source
> >>>> needs to
> >>>> > > >> preserve a partial order between splits (Kinesis, Pravega,
> >>>> Pulsar may
> >>>> > > need
> >>>> > > >> that).
> >>>> > > >>    --> SplitEnumerator gets notification when SplitReaders
> start
> >>>> and
> >>>> > > when
> >>>> > > >> they finish splits. They can decide at that moment to push more
> >>>> splits
> >>>> > > to
> >>>> > > >> that reader
> >>>> > > >>    --> The SplitEnumerator should probably be aware of the
> source
> >>>> > > >> parallelism, to build its initial distribution.
> >>>> > > >>
> >>>> > > >>  - Open question: Should the source expose something like "host
> >>>> > > >> preferences", so that yarn/mesos/k8s can take this into account
> >>>> when
> >>>> > > >> selecting a node to start a TM on?
> >>>> > > >>
> >>>> > > >> *(5) Watermarks and event time alignment*
> >>>> > > >>
> >>>> > > >>  - Watermark generation, as well as idleness, needs to be per
> >>>> split
> >>>> > > (like
> >>>> > > >> currently in the Kafka Source, per partition)
> >>>> > > >>  - It is desirable to support optional event-time-alignment,
> >>>> meaning
> >>>> > > that
> >>>> > > >> splits that are ahead are back-pressured or temporarily
> >>>> unsubscribed
> >>>> > > >>
> >>>> > > >>  - I think i would be desirable to encapsulate watermark
> >>>> generation
> >>>> > > logic
> >>>> > > >> in watermark generators, for a separation of concerns. The
> >>>> watermark
> >>>> > > >> generators should run per split.
> >>>> > > >>  - Using watermark generators would also help with another
> >>>> problem of
> >>>> > > the
> >>>> > > >> suggested interface, namely supporting non-periodic watermarks
> >>>> > > efficiently.
> >>>> > > >>
> >>>> > > >>  - Need a way to "dispatch" next record to different watermark
> >>>> > > generators
> >>>> > > >>  - Need a way to tell SplitReader to "suspend" a split until a
> >>>> certain
> >>>> > > >> watermark is reached (event time backpressure)
> >>>> > > >>  - This would in fact be not needed (and thus simpler) if we
> had
> >>>> a
> >>>> > > >> SplitReader per split and may be a reason to re-open that
> >>>> discussion
> >>>> > > >>
> >>>> > > >> *(6) Watermarks across splits and in the Split Enumerator*
> >>>> > > >>
> >>>> > > >>  - The split enumerator may need some watermark awareness,
> which
> >>>> > should
> >>>> > > be
> >>>> > > >> purely based on split metadata (like create timestamp of file
> >>>> splits)
> >>>> > > >>  - If there are still more splits with overlapping event time
> >>>> range
> >>>> > for
> >>>> > > a
> >>>> > > >> split reader, then that split reader should not advance the
> >>>> watermark
> >>>> > > >> within the split beyond the overlap boundary. Otherwise future
> >>>> splits
> >>>> > > will
> >>>> > > >> produce late data.
> >>>> > > >>
> >>>> > > >>  - One way to approach this could be that the split enumerator
> >>>> may
> >>>> > send
> >>>> > > >> watermarks to the readers, and the readers cannot emit
> watermarks
> >>>> > beyond
> >>>> > > >> that received watermark.
> >>>> > > >>  - Many split enumerators would simply immediately send
> Long.MAX
> >>>> out
> >>>> > and
> >>>> > > >> leave the progress purely to the split readers.
> >>>> > > >>
> >>>> > > >>  - For event-time alignment / split back pressure, this begs
> the
> >>>> > > question
> >>>> > > >> how we can avoid deadlocks that may arise when splits are
> >>>> suspended
> >>>> > for
> >>>> > > >> event time back pressure,
> >>>> > > >>
> >>>> > > >> *(7) Batch and streaming Unification*
> >>>> > > >>
> >>>> > > >>  - Functionality wise, the above design should support both
> >>>> > > >>  - Batch often (mostly) does not care about reading "in order"
> >>>> and
> >>>> > > >> generating watermarks
> >>>> > > >>    --> Might use different enumerator logic that is more
> locality
> >>>> > aware
> >>>> > > >> and ignores event time order
> >>>> > > >>    --> Does not generate watermarks
> >>>> > > >>  - Would be great if bounded sources could be identified at
> >>>> compile
> >>>> > > time,
> >>>> > > >> so that "env.addBoundedSource(...)" is type safe and can
> return a
> >>>> > > >> "BoundedDataStream".
> >>>> > > >>  - Possible to defer this discussion until later
> >>>> > > >>
> >>>> > > >> *Miscellaneous Comments*
> >>>> > > >>
> >>>> > > >>  - Should the source have a TypeInformation for the produced
> >>>> type,
> >>>> > > instead
> >>>> > > >> of a serializer? We need a type information in the stream
> >>>> anyways, and
> >>>> > > can
> >>>> > > >> derive the serializer from that. Plus, creating the serializer
> >>>> should
> >>>> > > >> respect the ExecutionConfig.
> >>>> > > >>
> >>>> > > >>  - The TypeSerializer interface is very powerful but also not
> >>>> easy to
> >>>> > > >> implement. Its purpose is to handle data super efficiently,
> >>>> support
> >>>> > > >> flexible ways of evolution, etc.
> >>>> > > >>  For metadata I would suggest to look at the
> >>>> SimpleVersionedSerializer
> >>>> > > >> instead, which is used for example for checkpoint master hooks,
> >>>> or for
> >>>> > > the
> >>>> > > >> streaming file sink. I think that is is a good match for cases
> >>>> where
> >>>> > we
> >>>> > > do
> >>>> > > >> not need more than ser/deser (no copy, etc.) and don't need to
> >>>> push
> >>>> > > >> versioning out of the serialization paths for best performance
> >>>> (as in
> >>>> > > the
> >>>> > > >> TypeSerializer)
> >>>> > > >>
> >>>> > > >>
> >>>> > > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas <
> >>>> > > >> k.klou...@data-artisans.com>
> >>>> > > >> wrote:
> >>>> > > >>
> >>>> > > >>> Hi Biao,
> >>>> > > >>>
> >>>> > > >>> Thanks for the answer!
> >>>> > > >>>
> >>>> > > >>> So given the multi-threaded readers, now we have as open
> >>>> questions:
> >>>> > > >>>
> >>>> > > >>> 1) How do we let the checkpoints pass through our
> multi-threaded
> >>>> > reader
> >>>> > > >>> operator?
> >>>> > > >>>
> >>>> > > >>> 2) Do we have separate reader and source operators or not? In
> >>>> the
> >>>> > > >> strategy
> >>>> > > >>> that has a separate source, the source operator has a
> >>>> parallelism of
> >>>> > 1
> >>>> > > >> and
> >>>> > > >>> is responsible for split recovery only.
> >>>> > > >>>
> >>>> > > >>> For the first one, given also the constraints (blocking,
> finite
> >>>> > queues,
> >>>> > > >>> etc), I do not have an answer yet.
> >>>> > > >>>
> >>>> > > >>> For the 2nd, I think that we should go with separate operators
> >>>> for
> >>>> > the
> >>>> > > >>> source and the readers, for the following reasons:
> >>>> > > >>>
> >>>> > > >>> 1) This is more aligned with a potential future improvement
> >>>> where the
> >>>> > > >> split
> >>>> > > >>> discovery becomes a responsibility of the JobManager and
> >>>> readers are
> >>>> > > >>> pooling more work from the JM.
> >>>> > > >>>
> >>>> > > >>> 2) The source is going to be the "single point of truth". It
> >>>> will
> >>>> > know
> >>>> > > >> what
> >>>> > > >>> has been processed and what not. If the source and the readers
> >>>> are a
> >>>> > > >> single
> >>>> > > >>> operator with parallelism > 1, or in general, if the split
> >>>> discovery
> >>>> > is
> >>>> > > >>> done by each task individually, then:
> >>>> > > >>>   i) we have to have a deterministic scheme for each reader to
> >>>> assign
> >>>> > > >>> splits to itself (e.g. mod subtaskId). This is not necessarily
> >>>> > trivial
> >>>> > > >> for
> >>>> > > >>> all sources.
> >>>> > > >>>   ii) each reader would have to keep a copy of all its
> processed
> >>>> > slpits
> >>>> > > >>>   iii) the state has to be a union state with a non-trivial
> >>>> merging
> >>>> > > >> logic
> >>>> > > >>> in order to support rescaling.
> >>>> > > >>>
> >>>> > > >>> Two additional points that you raised above:
> >>>> > > >>>
> >>>> > > >>> i) The point that you raised that we need to keep all splits
> >>>> > (processed
> >>>> > > >> and
> >>>> > > >>> not-processed) I think is a bit of a strong requirement. This
> >>>> would
> >>>> > > imply
> >>>> > > >>> that for infinite sources the state will grow indefinitely.
> >>>> This is
> >>>> > > >> problem
> >>>> > > >>> is even more pronounced if we do not have a single source that
> >>>> > assigns
> >>>> > > >>> splits to readers, as each reader will have its own copy of
> the
> >>>> > state.
> >>>> > > >>>
> >>>> > > >>> ii) it is true that for finite sources we need to somehow not
> >>>> close
> >>>> > the
> >>>> > > >>> readers when the source/split discoverer finishes. The
> >>>> > > >>> ContinuousFileReaderOperator has a work-around for that. It is
> >>>> not
> >>>> > > >> elegant,
> >>>> > > >>> and checkpoints are not emitted after closing the source, but
> >>>> this, I
> >>>> > > >>> believe, is a bigger problem which requires more changes than
> >>>> just
> >>>> > > >>> refactoring the source interface.
> >>>> > > >>>
> >>>> > > >>> Cheers,
> >>>> > > >>> Kostas
> >>>> > > >>>
> >>>> > > >>
> >>>> > >
> >>>> > >
> >>>> >
> >>>>
> >>>
>

Reply via email to