Hi Stephan, Thank you for feedback! Will take a look at your branch before public discussing.
On Fri, Jul 26, 2019 at 12:01 AM Stephan Ewen <se...@apache.org> wrote: > Hi Biao! > > Thanks for reviving this. I would like to join this discussion, but am > quite occupied with the 1.9 release, so can we maybe pause this discussion > for a week or so? > > In the meantime I can share some suggestion based on prior experiments: > > How to do watermarks / timestamp extractors in a simpler and more flexible > way. I think that part is quite promising should be part of the new source > interface. > > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/eventtime > > > https://github.com/StephanEwen/flink/blob/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src/SourceOutput.java > > > > Some experiments on how to build the source reader and its library for > common threading/split patterns: > > https://github.com/StephanEwen/flink/tree/source_interface/flink-core/src/main/java/org/apache/flink/api/common/src > > > Best, > Stephan > > > On Thu, Jul 25, 2019 at 10:03 AM Biao Liu <mmyy1...@gmail.com> wrote: > >> Hi devs, >> >> Since 1.9 is nearly released, I think we could get back to FLIP-27. I >> believe it should be included in 1.10. >> >> There are so many things mentioned in document of FLIP-27. [1] I think >> we'd better discuss them separately. However the wiki is not a good place >> to discuss. I wrote google doc about SplitReader API which misses some >> details in the document. [2] >> >> 1. >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface >> 2. >> https://docs.google.com/document/d/1R1s_89T4S3CZwq7Tf31DciaMCqZwrLHGZFqPASu66oE/edit?usp=sharing >> >> CC Stephan, Aljoscha, Piotrek, Becket >> >> >> On Thu, Mar 28, 2019 at 4:38 PM Biao Liu <mmyy1...@gmail.com> wrote: >> >>> Hi Steven, >>> Thank you for the feedback. Please take a look at the document FLIP-27 >>> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface> >>> which >>> is updated recently. A lot of details of enumerator were added in this >>> document. I think it would help. >>> >>> Steven Wu <stevenz...@gmail.com> 于2019年3月28日周四 下午12:52写道: >>> >>>> This proposal mentioned that SplitEnumerator might run on the >>>> JobManager or >>>> in a single task on a TaskManager. >>>> >>>> if enumerator is a single task on a taskmanager, then the job DAG can >>>> never >>>> been embarrassingly parallel anymore. That will nullify the leverage of >>>> fine-grained recovery for embarrassingly parallel jobs. >>>> >>>> It's not clear to me what's the implication of running enumerator on the >>>> jobmanager. So I will leave that out for now. >>>> >>>> On Mon, Jan 28, 2019 at 3:05 AM Biao Liu <mmyy1...@gmail.com> wrote: >>>> >>>> > Hi Stephan & Piotrek, >>>> > >>>> > Thank you for feedback. >>>> > >>>> > It seems that there are a lot of things to do in community. I am just >>>> > afraid that this discussion may be forgotten since there so many >>>> proposals >>>> > recently. >>>> > Anyway, wish to see the split topics soon :) >>>> > >>>> > Piotr Nowojski <pi...@da-platform.com> 于2019年1月24日周四 下午8:21写道: >>>> > >>>> > > Hi Biao! >>>> > > >>>> > > This discussion was stalled because of preparations for the open >>>> sourcing >>>> > > & merging Blink. I think before creating the tickets we should >>>> split this >>>> > > discussion into topics/areas outlined by Stephan and create Flips >>>> for >>>> > that. >>>> > > >>>> > > I think there is no chance for this to be completed in couple of >>>> > remaining >>>> > > weeks/1 month before 1.8 feature freeze, however it would be good >>>> to aim >>>> > > with those changes for 1.9. >>>> > > >>>> > > Piotrek >>>> > > >>>> > > > On 20 Jan 2019, at 16:08, Biao Liu <mmyy1...@gmail.com> wrote: >>>> > > > >>>> > > > Hi community, >>>> > > > The summary of Stephan makes a lot sense to me. It is much clearer >>>> > indeed >>>> > > > after splitting the complex topic into small ones. >>>> > > > I was wondering is there any detail plan for next step? If not, I >>>> would >>>> > > > like to push this thing forward by creating some JIRA issues. >>>> > > > Another question is that should version 1.8 include these >>>> features? >>>> > > > >>>> > > > Stephan Ewen <se...@apache.org> 于2018年12月1日周六 上午4:20写道: >>>> > > > >>>> > > >> Thanks everyone for the lively discussion. Let me try to >>>> summarize >>>> > > where I >>>> > > >> see convergence in the discussion and open issues. >>>> > > >> I'll try to group this by design aspect of the source. Please >>>> let me >>>> > > know >>>> > > >> if I got things wrong or missed something crucial here. >>>> > > >> >>>> > > >> For issues 1-3, if the below reflects the state of the >>>> discussion, I >>>> > > would >>>> > > >> try and update the FLIP in the next days. >>>> > > >> For the remaining ones we need more discussion. >>>> > > >> >>>> > > >> I would suggest to fork each of these aspects into a separate >>>> mail >>>> > > thread, >>>> > > >> or will loose sight of the individual aspects. >>>> > > >> >>>> > > >> *(1) Separation of Split Enumerator and Split Reader* >>>> > > >> >>>> > > >> - All seem to agree this is a good thing >>>> > > >> - Split Enumerator could in the end live on JobManager (and >>>> assign >>>> > > splits >>>> > > >> via RPC) or in a task (and assign splits via data streams) >>>> > > >> - this discussion is orthogonal and should come later, when the >>>> > > interface >>>> > > >> is agreed upon. >>>> > > >> >>>> > > >> *(2) Split Readers for one or more splits* >>>> > > >> >>>> > > >> - Discussion seems to agree that we need to support one reader >>>> that >>>> > > >> possibly handles multiple splits concurrently. >>>> > > >> - The requirement comes from sources where one poll()-style call >>>> > > fetches >>>> > > >> data from different splits / partitions >>>> > > >> --> example sources that require that would be for example >>>> Kafka, >>>> > > >> Pravega, Pulsar >>>> > > >> >>>> > > >> - Could have one split reader per source, or multiple split >>>> readers >>>> > > that >>>> > > >> share the "poll()" function >>>> > > >> - To not make it too complicated, we can start with thinking >>>> about >>>> > one >>>> > > >> split reader for all splits initially and see if that covers all >>>> > > >> requirements >>>> > > >> >>>> > > >> *(3) Threading model of the Split Reader* >>>> > > >> >>>> > > >> - Most active part of the discussion ;-) >>>> > > >> >>>> > > >> - A non-blocking way for Flink's task code to interact with the >>>> > source >>>> > > is >>>> > > >> needed in order to a task runtime code based on a >>>> > > >> single-threaded/actor-style task design >>>> > > >> --> I personally am a big proponent of that, it will help with >>>> > > >> well-behaved checkpoints, efficiency, and simpler yet more robust >>>> > > runtime >>>> > > >> code >>>> > > >> >>>> > > >> - Users care about simple abstraction, so as a subclass of >>>> > SplitReader >>>> > > >> (non-blocking / async) we need to have a BlockingSplitReader >>>> which >>>> > will >>>> > > >> form the basis of most source implementations. >>>> BlockingSplitReader >>>> > lets >>>> > > >> users do blocking simple poll() calls. >>>> > > >> - The BlockingSplitReader would spawn a thread (or more) and the >>>> > > >> thread(s) can make blocking calls and hand over data buffers via >>>> a >>>> > > blocking >>>> > > >> queue >>>> > > >> - This should allow us to cover both, a fully async runtime, >>>> and a >>>> > > simple >>>> > > >> blocking interface for users. >>>> > > >> - This is actually very similar to how the Kafka connectors >>>> work. >>>> > Kafka >>>> > > >> 9+ with one thread, Kafka 8 with multiple threads >>>> > > >> >>>> > > >> - On the base SplitReader (the async one), the non-blocking >>>> method >>>> > that >>>> > > >> gets the next chunk of data would signal data availability via a >>>> > > >> CompletableFuture, because that gives the best flexibility (can >>>> await >>>> > > >> completion or register notification handlers). >>>> > > >> - The source task would register a "thenHandle()" (or similar) >>>> on the >>>> > > >> future to put a "take next data" task into the actor-style >>>> mailbox >>>> > > >> >>>> > > >> *(4) Split Enumeration and Assignment* >>>> > > >> >>>> > > >> - Splits may be generated lazily, both in cases where there is a >>>> > > limited >>>> > > >> number of splits (but very many), or splits are discovered over >>>> time >>>> > > >> - Assignment should also be lazy, to get better load balancing >>>> > > >> - Assignment needs support locality preferences >>>> > > >> >>>> > > >> - Possible design based on discussion so far: >>>> > > >> >>>> > > >> --> SplitReader has a method "addSplits(SplitT...)" to add >>>> one or >>>> > > more >>>> > > >> splits. Some split readers might assume they have only one split >>>> ever, >>>> > > >> concurrently, others assume multiple splits. (Note: idea behind >>>> being >>>> > > able >>>> > > >> to add multiple splits at the same time is to ease startup where >>>> > > multiple >>>> > > >> splits may be assigned instantly.) >>>> > > >> --> SplitReader has a context object on which it can call >>>> indicate >>>> > > when >>>> > > >> splits are completed. The enumerator gets that notification and >>>> can >>>> > use >>>> > > to >>>> > > >> decide when to assign new splits. This should help both in cases >>>> of >>>> > > sources >>>> > > >> that take splits lazily (file readers) and in case the source >>>> needs to >>>> > > >> preserve a partial order between splits (Kinesis, Pravega, >>>> Pulsar may >>>> > > need >>>> > > >> that). >>>> > > >> --> SplitEnumerator gets notification when SplitReaders start >>>> and >>>> > > when >>>> > > >> they finish splits. They can decide at that moment to push more >>>> splits >>>> > > to >>>> > > >> that reader >>>> > > >> --> The SplitEnumerator should probably be aware of the source >>>> > > >> parallelism, to build its initial distribution. >>>> > > >> >>>> > > >> - Open question: Should the source expose something like "host >>>> > > >> preferences", so that yarn/mesos/k8s can take this into account >>>> when >>>> > > >> selecting a node to start a TM on? >>>> > > >> >>>> > > >> *(5) Watermarks and event time alignment* >>>> > > >> >>>> > > >> - Watermark generation, as well as idleness, needs to be per >>>> split >>>> > > (like >>>> > > >> currently in the Kafka Source, per partition) >>>> > > >> - It is desirable to support optional event-time-alignment, >>>> meaning >>>> > > that >>>> > > >> splits that are ahead are back-pressured or temporarily >>>> unsubscribed >>>> > > >> >>>> > > >> - I think i would be desirable to encapsulate watermark >>>> generation >>>> > > logic >>>> > > >> in watermark generators, for a separation of concerns. The >>>> watermark >>>> > > >> generators should run per split. >>>> > > >> - Using watermark generators would also help with another >>>> problem of >>>> > > the >>>> > > >> suggested interface, namely supporting non-periodic watermarks >>>> > > efficiently. >>>> > > >> >>>> > > >> - Need a way to "dispatch" next record to different watermark >>>> > > generators >>>> > > >> - Need a way to tell SplitReader to "suspend" a split until a >>>> certain >>>> > > >> watermark is reached (event time backpressure) >>>> > > >> - This would in fact be not needed (and thus simpler) if we had >>>> a >>>> > > >> SplitReader per split and may be a reason to re-open that >>>> discussion >>>> > > >> >>>> > > >> *(6) Watermarks across splits and in the Split Enumerator* >>>> > > >> >>>> > > >> - The split enumerator may need some watermark awareness, which >>>> > should >>>> > > be >>>> > > >> purely based on split metadata (like create timestamp of file >>>> splits) >>>> > > >> - If there are still more splits with overlapping event time >>>> range >>>> > for >>>> > > a >>>> > > >> split reader, then that split reader should not advance the >>>> watermark >>>> > > >> within the split beyond the overlap boundary. Otherwise future >>>> splits >>>> > > will >>>> > > >> produce late data. >>>> > > >> >>>> > > >> - One way to approach this could be that the split enumerator >>>> may >>>> > send >>>> > > >> watermarks to the readers, and the readers cannot emit watermarks >>>> > beyond >>>> > > >> that received watermark. >>>> > > >> - Many split enumerators would simply immediately send Long.MAX >>>> out >>>> > and >>>> > > >> leave the progress purely to the split readers. >>>> > > >> >>>> > > >> - For event-time alignment / split back pressure, this begs the >>>> > > question >>>> > > >> how we can avoid deadlocks that may arise when splits are >>>> suspended >>>> > for >>>> > > >> event time back pressure, >>>> > > >> >>>> > > >> *(7) Batch and streaming Unification* >>>> > > >> >>>> > > >> - Functionality wise, the above design should support both >>>> > > >> - Batch often (mostly) does not care about reading "in order" >>>> and >>>> > > >> generating watermarks >>>> > > >> --> Might use different enumerator logic that is more locality >>>> > aware >>>> > > >> and ignores event time order >>>> > > >> --> Does not generate watermarks >>>> > > >> - Would be great if bounded sources could be identified at >>>> compile >>>> > > time, >>>> > > >> so that "env.addBoundedSource(...)" is type safe and can return a >>>> > > >> "BoundedDataStream". >>>> > > >> - Possible to defer this discussion until later >>>> > > >> >>>> > > >> *Miscellaneous Comments* >>>> > > >> >>>> > > >> - Should the source have a TypeInformation for the produced >>>> type, >>>> > > instead >>>> > > >> of a serializer? We need a type information in the stream >>>> anyways, and >>>> > > can >>>> > > >> derive the serializer from that. Plus, creating the serializer >>>> should >>>> > > >> respect the ExecutionConfig. >>>> > > >> >>>> > > >> - The TypeSerializer interface is very powerful but also not >>>> easy to >>>> > > >> implement. Its purpose is to handle data super efficiently, >>>> support >>>> > > >> flexible ways of evolution, etc. >>>> > > >> For metadata I would suggest to look at the >>>> SimpleVersionedSerializer >>>> > > >> instead, which is used for example for checkpoint master hooks, >>>> or for >>>> > > the >>>> > > >> streaming file sink. I think that is is a good match for cases >>>> where >>>> > we >>>> > > do >>>> > > >> not need more than ser/deser (no copy, etc.) and don't need to >>>> push >>>> > > >> versioning out of the serialization paths for best performance >>>> (as in >>>> > > the >>>> > > >> TypeSerializer) >>>> > > >> >>>> > > >> >>>> > > >> On Tue, Nov 27, 2018 at 11:45 AM Kostas Kloudas < >>>> > > >> k.klou...@data-artisans.com> >>>> > > >> wrote: >>>> > > >> >>>> > > >>> Hi Biao, >>>> > > >>> >>>> > > >>> Thanks for the answer! >>>> > > >>> >>>> > > >>> So given the multi-threaded readers, now we have as open >>>> questions: >>>> > > >>> >>>> > > >>> 1) How do we let the checkpoints pass through our multi-threaded >>>> > reader >>>> > > >>> operator? >>>> > > >>> >>>> > > >>> 2) Do we have separate reader and source operators or not? In >>>> the >>>> > > >> strategy >>>> > > >>> that has a separate source, the source operator has a >>>> parallelism of >>>> > 1 >>>> > > >> and >>>> > > >>> is responsible for split recovery only. >>>> > > >>> >>>> > > >>> For the first one, given also the constraints (blocking, finite >>>> > queues, >>>> > > >>> etc), I do not have an answer yet. >>>> > > >>> >>>> > > >>> For the 2nd, I think that we should go with separate operators >>>> for >>>> > the >>>> > > >>> source and the readers, for the following reasons: >>>> > > >>> >>>> > > >>> 1) This is more aligned with a potential future improvement >>>> where the >>>> > > >> split >>>> > > >>> discovery becomes a responsibility of the JobManager and >>>> readers are >>>> > > >>> pooling more work from the JM. >>>> > > >>> >>>> > > >>> 2) The source is going to be the "single point of truth". It >>>> will >>>> > know >>>> > > >> what >>>> > > >>> has been processed and what not. If the source and the readers >>>> are a >>>> > > >> single >>>> > > >>> operator with parallelism > 1, or in general, if the split >>>> discovery >>>> > is >>>> > > >>> done by each task individually, then: >>>> > > >>> i) we have to have a deterministic scheme for each reader to >>>> assign >>>> > > >>> splits to itself (e.g. mod subtaskId). This is not necessarily >>>> > trivial >>>> > > >> for >>>> > > >>> all sources. >>>> > > >>> ii) each reader would have to keep a copy of all its processed >>>> > slpits >>>> > > >>> iii) the state has to be a union state with a non-trivial >>>> merging >>>> > > >> logic >>>> > > >>> in order to support rescaling. >>>> > > >>> >>>> > > >>> Two additional points that you raised above: >>>> > > >>> >>>> > > >>> i) The point that you raised that we need to keep all splits >>>> > (processed >>>> > > >> and >>>> > > >>> not-processed) I think is a bit of a strong requirement. This >>>> would >>>> > > imply >>>> > > >>> that for infinite sources the state will grow indefinitely. >>>> This is >>>> > > >> problem >>>> > > >>> is even more pronounced if we do not have a single source that >>>> > assigns >>>> > > >>> splits to readers, as each reader will have its own copy of the >>>> > state. >>>> > > >>> >>>> > > >>> ii) it is true that for finite sources we need to somehow not >>>> close >>>> > the >>>> > > >>> readers when the source/split discoverer finishes. The >>>> > > >>> ContinuousFileReaderOperator has a work-around for that. It is >>>> not >>>> > > >> elegant, >>>> > > >>> and checkpoints are not emitted after closing the source, but >>>> this, I >>>> > > >>> believe, is a bigger problem which requires more changes than >>>> just >>>> > > >>> refactoring the source interface. >>>> > > >>> >>>> > > >>> Cheers, >>>> > > >>> Kostas >>>> > > >>> >>>> > > >> >>>> > > >>>> > > >>>> > >>>> >>>