Hi Aljoscha, This is an excellent question! And the answer is, we don't need any new concepts like "SDF executor" and can rely on the per-key state and timers machinery that already exists in all runners because it's necessary to implement windowing/triggering properly.
Note that this is already somewhat addressed in the previously posted State and Timers proposal https://s.apache.org/beam-state , under "per-key workflows". Think of it this way, using the Kafka example: we'll expand it into a transform: (1) ParDo { topic -> (unique key, topic, partition, [0, inf))) for partition in topic.listPartitions() } (2) GroupByKey (3) ParDo { key, topic, partition, R -> Kafka reader code in the proposal/slides } - R is the OffsetRange restriction which in this case will be always of the form [startOffset, inf). - there'll be just 1 value per key, but we use GBK to just get access to the per-key state/timers machinery. This may be runner-specific; maybe some runners don't need a GBK to do that. Now suppose the topic has two partitions, P1 and P2, and they get assigned unique keys K1, K2. Then the input to (3) will be a collection of: (K1, topic, P1, [0, inf)), (K2, topic, P2, [0, inf)). Suppose we have just 1 worker with just 1 thread. Now, how will this thread be able to produce elements from both P1 and P2? here's how. The thread will process (K1, topic, P1, [0, inf)), checkpoint after a certain time or after a certain number of elements are output (just like with the current UnboundedSource reading code) producing a residual restriction R1' (basically a new start timestamp), put R11 into the per-key state and set a timer T1 to resume. Then it will process (K2, topic, P2, [0, inf)), do the same producing a residual restriction R2' and setting a timer T2 to resume. Then timer T1 will fire in the context of the key K1. The thread will call processElement again, this time supplying R1' as the restriction; the process repeats and after a while it checkpoints and stores R1'' into state of K1. Then timer T2 will fire in the context of K2, run processElement for a while, set a new timer and store R2'' into the state of K2. Etc. If partition 1 goes away, the processElement call will return "do not resume", so a timer will not be set and instead the state associated with K1 will be GC'd. So basically it's almost like cooperative thread scheduling: things run for a while, until the runner tells them to checkpoint, then they set a timer to resume themselves, and the runner fires the timers, and the process repeats. And, again, this only requires things that runners can already do - state and timers, but no new concept of SDF executor (and consequently no necessity to choose/tune how many you need). Makes sense? On Sat, Aug 20, 2016 at 9:46 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > I have another question that I think wasn't addressed in the meeting. At > least it wasn't mentioned in the notes. > > In the context of replacing sources by a combination of to SDFs, how do you > determine how many "SDF executor" instances you need downstream? For the > sake of argument assume that both SDFs are executed with parallelism 1 (or > one per worker). Now, if you have a file source that reads from a static > set of files the first SDF would emit the filenames while the second SDF > would receive the filenames and emit their contents. This works well and > the downstream SDF can process one filename after the other. Now, think of > something like a Kafka source. The first SDF would emit the partitions (say > 4 partitions, in this example) and the second SDF would be responsible for > reading from a topic and emitting elements. Reading from one topic never > finishes so you can't process the topics in series. I think you would need > to have 4 downstream "SDF executor" instances. The question now is: how do > you determine whether you are in the first or the second situation? > > Probably I'm just overlooking something and this is already dealt with > somewhere... :-) > > Cheers, > Aljoscha > > On Fri, 19 Aug 2016 at 21:02 Ismaël Mejía <ieme...@gmail.com> wrote: > > > Hello, > > > > Thanks for the notes both Dan and Eugene, and for taking the time to do > the > > presentation and answer our questions. > > > > I mentioned the ongoing work on dynamic scaling on Flink because I > suppose > > that it will address dynamic rebalancing eventually (there are multiple > > changes going on for dynamic scaling). > > > > > > > https://docs.google.com/document/d/1G1OS1z3xEBOrYD4wSu-LuBCyPUWyFd9l3T9WyssQ63w/edit#heading=h.2suhu1fjxmp4 > > > > https://lists.apache.org/list.html?d...@flink.apache.org:lte=1M:FLIP-8 > > > > Anyway I am far from an expert on flink, but probably the flink guys can > > give their opinion about this and refer to a more precise document that > the > > ones I mentioned.. > > > > Thanks again, > > Ismaël > > > > On Fri, Aug 19, 2016 at 8:52 PM, Jean-Baptiste Onofré <j...@nanthrax.net> > > wrote: > > > > > Great summary Eugene and Dan. > > > > > > And thanks again for the details, explanation, and discussion. > > > > > > Regards > > > JB > > > > > > > > > On 08/19/2016 08:16 PM, Eugene Kirpichov wrote: > > > > > >> Thanks for attending, everybody! > > >> > > >> Here are meeting notes (thanks Dan!). > > >> > > >> Q: Will SplittableDoFn enable better repartitioning of the > input/output > > >> data? > > >> A: Not really; repartitioning is orthogonal to SDF. > > >> > > >> Current Source API suffers from lack of composition and scalability > > >> because > > >> we treat sources too much as metadata, not enough as data. > > >> > > >> Q(slide with transform expansion): who does the "magic"? > > >> A: The runner. Checkpointing and dynamically splitting restrictions > will > > >> require collaboration with the runner. > > >> > > >> Q: How does the runner interact with the DoFn to control the > > restrictions? > > >> Is it related to the centralized job tracker etc.? > > >> A: RestrictionTracker is a simple helper object, that exists purely on > > the > > >> worker while executing a single partition, and interacts with the > worker > > >> harness part of the runner. Not to be confused with the centralized > job > > >> tracker (master) - completely unrelated. Worker harness, of course, > > >> interacts with the master in some relevant ways (e.g. Dataflow master > > can > > >> tell "you're a straggler, you should split"). > > >> > > >> Q: Is this a new DoFn subclass, or how will this integrate with the > > >> existing code? > > >> A: It's a feature of reflection-based DoFn ( > > https://s.apache.org/a-new-do > > >> fn) > > >> - just another optional parameter of type RestrictionTracker to > > >> processElement() which is dynamically bound via reflection, so fully > > >> backward/forward compatible, and looks to users like a regular DoFn. > > >> > > >> Q: why is fractionClaimed a double? > > >> A: It's a number in 0..1. Job-wide magic (e.g. autoscaling, dynamic > > >> rebalancing) requires a uniform way to represent progress through > > >> different > > >> sources. > > >> > > >> Q: Spark runner is microbatch-based, so this seems to map well onto > > >> checkpoint/resume, right? > > >> A: Yes; actually the Dataflow runner is, at a worker level, also > > >> microbatch-based. The way SDF interacts with a runner will be very > > similar > > >> to how a Bounded/UnboundedSource interacts with a runner. > > >> > > >> Q: Using SDF, what would be the "packaging" of the IO? > > >> A: Same as currently: package IO's as PTransforms and their > > implementation > > >> under the hood can be anything: Source, simple ParDo's, SDF, etc. E.g. > > >> Datastore was recently refactored from BoundedSource to ParDo (ended > up > > >> simpler and more scalable), transparently to users. > > >> > > >> Q: What's the timeline; what to do with the IOs currently in > > development? > > >> A: Timeline is O(months). Keep doing what you're doing and working on > > top > > >> of Source APIs when necessary and simple ParDo's otherwise. > > >> > > >> Q: What's the impact for the runner writers? > > >> A: Tentatively expected that most of the code for running an SDF will > be > > >> common to runners, with some amount of per-runner glue code, just like > > >> GBK/windowing/triggering. Impact on Dataflow runner is larger since it > > >> supports dynamic rebalancing in batch mode and this is the hardest > part, > > >> but for other runners shouldn't be too hard. > > >> > > >> JB: Talend has people who can help with this: e.g. help integrate into > > >> Spark runner, refactor IOs etc. Amit also willing to chat about > > supporting > > >> SDF in Spark runner. > > >> > > >> Ismael: There's a Flink proposal about dynamic rebalancing. Ismael > will > > >> send a link. > > >> > > >> > > >> On Fri, Aug 19, 2016 at 7:21 AM Jean-Baptiste Onofré <j...@nanthrax.net > > > > >> wrote: > > >> > > >> Hi Eugene, > > >>> > > >>> thanks for the reminder. > > >>> > > >>> Just to prepare some topics for the call, please find some points: > > >>> > > >>> 1. Using SDF, what would be the "packaging" of the IO ? It sounds to > me > > >>> that we can keep the IO packaging style (using with* setters for the > IO > > >>> configuration) and replace PTransform, Source, Reader, ... directly > > with > > >>> SDF. Correct ? > > >>> > > >>> 2. What's your plan in term of release to include SDF ? We have > several > > >>> IOs in preparation and I wonder if it's worth to start to use the new > > >>> SDF API or not. > > >>> > > >>> 3. What's the impact for the runner writers ? The runners will have > to > > >>> support SDF, that could be tricky depending of the execution engine. > In > > >>> the worst case where the runner can't fully support SDF, does it mean > > >>> that most of our IOs will be useless ? > > >>> > > >>> Just my dumb topics ;) > > >>> > > >>> Thanks, > > >>> See you at 8am ! > > >>> > > >>> Regards > > >>> JB > > >>> > > >>> On 08/19/2016 02:29 AM, Eugene Kirpichov wrote: > > >>> > > >>>> Hello everybody, > > >>>> > > >>>> Just a reminder: > > >>>> > > >>>> The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to > > >>>> join > > >>>> the call go to > > >>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn . > > >>>> I intend to go over the proposed design and then have a free-form > > >>>> discussion. > > >>>> > > >>>> Please have a skim through the proposal doc: https://s.apache.org/ > > >>>> splittable-do-fn > > >>>> I also made some slides that are basically a trimmed-down version of > > the > > >>>> doc to use as a guide when conducting the meeting, > > >>>> > > >>>> https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047 > > >>> Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing > > >>> > > >>>> . > > >>>> > > >>>> I will post notes from the meeting on this thread afterwards. > > >>>> > > >>>> Thanks, looking forward. > > >>>> > > >>>> On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin > > >>>> <dhalp...@google.com.invalid > > >>>> > > >>>> wrote: > > >>>> > > >>>> This is pretty cool! I'll be there too. (unless the hangout gets too > > >>>>> > > >>>> full > > >>> > > >>>> -- if so, I'll drop out in favor of others who aren't lucky enough > to > > >>>>> > > >>>> get > > >>> > > >>>> to talk to Eugene all the time.) > > >>>>> > > >>>>> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis < > > >>>>> > > >>>> psaltis.and...@gmail.com> > > >>> > > >>>> wrote: > > >>>>> > > >>>>> +1 I'll join > > >>>>>> > > >>>>>> On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < > > >>>>>> > > >>>>> apban...@cisco.com > > >>>>> > > >>>>>> > > >>>>>>> wrote: > > >>>>>> > > >>>>>> + 1, me2 > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> > > >>>>>>> On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com > > >>>>>>> > > >>>>>> <javascript:;>> > > >>> > > >>>> wrote: > > >>>>>>> > > >>>>>>> +1 as in I'll join ;-) > > >>>>>>>> > > >>>>>>>> On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov > > >>>>>>>> > > >>>>>>> <kirpic...@google.com.invalid > > >>>>>> > > >>>>>>> > > >>>>>>>> wrote: > > >>>>>>>> > > >>>>>>>> Sounds good, thanks! > > >>>>>>>>> Then Friday Aug 19th it is, 8am-9am PST, > > >>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > > >>>>>>>>> > > >>>>>>>> com/splittabledofn > > >>>>>>> > > >>>>>>>> > > >>>>>>>>> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré < > > >>>>>>>>> > > >>>>>>>> j...@nanthrax.net > > >>>>>> > > >>>>>>> <javascript:;>> > > >>>>>>> > > >>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>> Hi > > >>>>>>>>>> > > >>>>>>>>>> Unfortunately I will be in Ireland on August 15th. What about > > >>>>>>>>>> > > >>>>>>>>> Friday > > >>>>> > > >>>>>> 19th ? > > >>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> Regards > > >>>>>>>>>> JB > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov > > >>>>>>>>>> <kirpic...@google.com.INVALID> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi JB, > > >>>>>>>>>>> > > >>>>>>>>>>> Sounds great, does the suggested time over videoconference > work > > >>>>>>>>>>> > > >>>>>>>>>> for > > >>>>> > > >>>>>> you? > > >>>>>>>>>>> > > >>>>>>>>>>> On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré < > > >>>>>>>>>>> > > >>>>>>>>>> j...@nanthrax.net <javascript:;>> > > >>>>>>> > > >>>>>>>> wrote: > > >>>>>>>>>>> > > >>>>>>>>>>> Hi Eugene > > >>>>>>>>>>>> > > >>>>>>>>>>>> May we talk together next week ? I like the proposal. I > would > > >>>>>>>>>>>> > > >>>>>>>>>>> just > > >>>>>> > > >>>>>>> need > > >>>>>>>>>>> > > >>>>>>>>>>>> some details for my understanding. > > >>>>>>>>>>>> > > >>>>>>>>>>>> Thanks > > >>>>>>>>>>>> Regards > > >>>>>>>>>>>> JB > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > > >>>>>>>>>>>> <kirpic...@google.com.INVALID> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>>>> Hi JB, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> What are your thoughts on this? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> I'm also thinking of having a virtual meeting to explain > more > > >>>>>>>>>>>>> > > >>>>>>>>>>>> about > > >>>>>>> > > >>>>>>>> this > > >>>>>>>>>>>>> proposal if necessary, since I understand it is a lot to > > >>>>>>>>>>>>> > > >>>>>>>>>>>> digest. > > >>>>> > > >>>>>> > > >>>>>>>>>>>>> How about: Monday Aug 15, 8am-9am Pacific time, over > > Hangouts? > > >>>>>>>>>>>>> (link: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> https://staging.talkgadget.google.com/hangouts/_/google. > > >>>>>>>>> > > >>>>>>>> com/splittabledofn > > >>>>>>> > > >>>>>>>> - > > >>>>>>>>>>>>> I confirmed that it can be joined without being logged > into a > > >>>>>>>>>>>>> > > >>>>>>>>>>>> Google > > >>>>>>> > > >>>>>>>> account) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Who'd be interested in attending, and does this time/date > > work > > >>>>>>>>>>>>> > > >>>>>>>>>>>> for > > >>>>>> > > >>>>>>> people? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov > > >>>>>>>>>>>>> > > >>>>>>>>>>>> <kirpic...@google.com <javascript:;>> > > >>>>>>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>>> > > >>>>>>>>>>>>> Hi JB, thanks for reading and for your comments! > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> It sounds like you are concerned about continued support > for > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> existing > > >>>>>>>>>>> > > >>>>>>>>>>>> IO's > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> people have developed, and about backward compatibility? > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> We do not need to remove the Source API, and all existing > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> Source-based > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> connectors will continue to work [though the document > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> proposes > > >>>>> > > >>>>>> at > > >>>>>>> > > >>>>>>>> some > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> point to make Read.from(Source) to translate to a wrapper > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> SDF > > >>>>> > > >>>>>> under > > >>>>>>>>>>> > > >>>>>>>>>>>> the > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> hood, to exercise the feature more and to make sure that > it > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> is > > >>>>> > > >>>>>> strictly > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> more powerful - but this is an optional implementation > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> detail]. > > >>>>>> > > >>>>>>> > > >>>>>>>>>>>>>> Perhaps the document phrases this too strongly - > "replacing > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> the > > >>>>>> > > >>>>>>> Source > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> API": a better phrasing would be "introducing a new API so > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> powerful > > >>>>>>>>>>> > > >>>>>>>>>>>> and > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> easy-to-use that hopefully people will choose it over the > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> Source > > >>>>>> > > >>>>>>> API > > >>>>>>>>>>> > > >>>>>>>>>>>> all > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> the time, even though they don't have to" :) And we can > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> discuss > > >>>>>> > > >>>>>>> whether or > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> not to actually deprecate/remove the Source API at some > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> point > > >>>>> > > >>>>>> down > > >>>>>>> > > >>>>>>>> the > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> road, once it becomes clear whether this is the case or > not. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> To give more context: this proposal came out of > discussions > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> within > > >>>>>>> > > >>>>>>>> the SDK > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> team over the past ~1.5 years, before the Beam project > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> existed, > > >>>>>> > > >>>>>>> on > > >>>>>>> > > >>>>>>>> how to > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> make major improvements to the Source API; perhaps it will > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> clarify > > >>>>>>> > > >>>>>>>> things > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> if I give a history of the ideas discussed: > > >>>>>>>>>>>>>> - The first idea was to introduce a > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> Read.from(PCollection<Source>) > > >>>>>>> > > >>>>>>>> transform while keeping the Source API intact - this, given > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> appropriate > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> implementation, would solve most of the scalability and > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> composability > > >>>>>>>>>>> > > >>>>>>>>>>>> issues of IO's. Then most connectors would look like : > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> ParDo<A, > > >>>>>> > > >>>>>>> Source<B>> > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> + Read.from(). > > >>>>>>>>>>>>>> - Then we figured that the Source class is an unnecessary > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> abstraction, as > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> it simply holds data. What if we only had a Reader<S, B> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> class > > >>>>> > > >>>>>> where > > >>>>>>>>>>> > > >>>>>>>>>>>> S is > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> the source type and B the output type? Then connectors > would > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> be > > >>>>>> > > >>>>>>> something > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>). > > >>>>>>>>>>>>>> - Then somebody remarked that some of the features of > Source > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> are > > >>>>>> > > >>>>>>> useful to > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> ParDo's as well: e.g. ability to report progress when > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> processing a > > >>>>>>> > > >>>>>>>> very > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> heavy element, or ability to produce very large output in > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> parallel. > > >>>>>>>>>>> > > >>>>>>>>>>>> - The two previous bullets were already hinting that the > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> Read.using() > > >>>>>>>>>>> > > >>>>>>>>>>>> primitive might not be so special: it just takes S and > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> produces > > >>>>>> > > >>>>>>> B: > > >>>>>>> > > >>>>>>>> isn't > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> that what a ParDo does, plus some source magic, minus the > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> convenience > > >>>>>>>>>>> > > >>>>>>>>>>>> of > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> c.output() vs. the start/advance() state machine? > > >>>>>>>>>>>>>> - At this point it became clear that we should explore > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> unifying > > >>>>>> > > >>>>>>> sources > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> and ParDo's, in particular: can we bring the magic of > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> sources > > >>>>> > > >>>>>> to > > >>>>>> > > >>>>>>> ParDo's > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> but without the limitations and coding inconveniences? And > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> this > > >>>>>> > > >>>>>>> is > > >>>>>>> > > >>>>>>>> how > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> SplittableDoFn was born: bringing source magic to a DoFn > by > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> providing > > >>>>>>>>>>> > > >>>>>>>>>>>> a > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> RangeTracker. > > >>>>>>>>>>>>>> - Once the idea of "splittable DoFn's" was born, it became > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> clear > > >>>>>> > > >>>>>>> that > > >>>>>>>>>>> > > >>>>>>>>>>>> it > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> is strictly more general than sources; at least, in the > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> respect > > >>>>>> > > >>>>>>> that > > >>>>>>>>>>> > > >>>>>>>>>>>> sources have to produce output, while DoFn's don't: an SDF > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> may > > >>>>> > > >>>>>> very > > >>>>>>>>>>> > > >>>>>>>>>>>> well > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> produce no output at all, and simply perform a side effect > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> in > > >>>>> > > >>>>>> a > > >>>>>> > > >>>>>>> parallel/resumable way. > > >>>>>>>>>>>>>> - Then there were countless hours of discussions on > unifying > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> the > > >>>>>> > > >>>>>>> bounded/unbounded cases, on the particulars of RangeTracker > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> APIs > > >>>>>> > > >>>>>>> reconciling parallelization and checkpointing, what the > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> relation > > >>>>>> > > >>>>>>> between > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> SDF and DF should be, etc. They culminated in the current > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> proposal. > > >>>>>>>>>>> > > >>>>>>>>>>>> The > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> proposal comes at a time when a couple of key ingredients > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> are > > >>>>> > > >>>>>> (almost) > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> ready: NewDoFn to make SDF look like a regular DoFn, and > the > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> State/Timers > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> proposal to enable unbounded work per element. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> To put it shortly: > > >>>>>>>>>>>>>> - Yes, we will support existing Source connectors, and > will > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> support > > >>>>>>>>>>> > > >>>>>>>>>>>> writing new ones, possibly forever. There is no interference > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> with > > >>>>>>> > > >>>>>>>> current > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> users of Source. > > >>>>>>>>>>>>>> - The new API is an attempt to improve the Source API, > taken > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> to > > >>>>>> > > >>>>>>> its > > >>>>>>>>>>> > > >>>>>>>>>>>> logical limit where it turns out that users' goals can be > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> accomplished > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> easier and more generically entirely within ParDo's. > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Let me know what you think, and thanks again! > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>> <j...@nanthrax.net <javascript:;>> > > >>>>>>>>>>> > > >>>>>>>>>>>> wrote: > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Hi Eugene, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Just a question: why is it in DoFn and note an > improvement > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> of > > >>>>> > > >>>>>> Source > > >>>>>>>>>>> > > >>>>>>>>>>>> ? > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> If I understand correctly, it means that we will have to > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> refactore > > >>>>>>>>>>> > > >>>>>>>>>>>> all > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> existing IO: basically, what you propose is to remove all > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> Source > > >>>>>>> > > >>>>>>>> to > > >>>>>>>>>>> > > >>>>>>>>>>>> replace with NewDoFn. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I'm concern with this approach, especially in term of > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> timing: > > >>>>> > > >>>>>> clearly, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> the IO is the area where we have to move forward in Beam > as > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> it > > >>>>>> > > >>>>>>> will > > >>>>>>>>>>> > > >>>>>>>>>>>> allow new users to start in their projects. > > >>>>>>>>>>>>>>> So, we started to bring new IOs: Kafka, JMS, Cassandra, > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> MongoDB, > > >>>>>>> > > >>>>>>>> JDBC, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> ... and some people started to learn the IO API > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> (Bounded/Unbouded > > >>>>>>> > > >>>>>>>> source, etc). > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> I think it would make more sense to enhance the IO API > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> (Source) > > >>>>>> > > >>>>>>> instead > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> of introducing a NewDoFn. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> What are your thoughts for IO writer like me ? ;) > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Regards > > >>>>>>>>>>>>>>> JB > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Hello Beam community, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> We (myself, Daniel Mills and Robert Bradshaw) would like > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> to > > >>>>> > > >>>>>> propose > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> "Splittable DoFn" - a major generalization of DoFn, which > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> allows > > >>>>>>>>>>> > > >>>>>>>>>>>> processing > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> of a single element to be non-monolithic, i.e. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> checkpointable > > >>>>>> > > >>>>>>> and > > >>>>>>>>>>> > > >>>>>>>>>>>> parallelizable, as well as doing an unbounded amount of > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> work > > >>>>>> > > >>>>>>> per > > >>>>>>>>>>> > > >>>>>>>>>>>> element. > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> This allows effectively replacing the current > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> Bounded/UnboundedSource > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> APIs > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> with DoFn's that are much easier to code, more scalable > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> and > > >>>>> > > >>>>>> composable > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> with > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> the rest of the Beam programming model, and enables many > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> use > > >>>>>> > > >>>>>>> cases > > >>>>>>>>>>> > > >>>>>>>>>>>> that > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> were previously difficult or impossible, as well as some > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> non-obvious new > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> use cases. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> This proposal has been mentioned before in JIRA > [BEAM-65] > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> and > > >>>>>> > > >>>>>>> some > > >>>>>>>>>>> > > >>>>>>>>>>>> Beam > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> meetings, and now the whole thing is written up in a > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> document: > > >>>>>>> > > >>>>>>>> > > >>>>>>>>>>>>>>>> https://s.apache.org/splittable-do-fn > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Here are some things that become possible with > Splittable > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> DoFn: > > >>>>>>> > > >>>>>>>> - Efficiently read a filepattern matching millions of > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> files > > >>>>> > > >>>>>> - Read a collection of files that are produced by an > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> earlier > > >>>>>> > > >>>>>>> step > > >>>>>>>>>>> > > >>>>>>>>>>>> in the > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> pipeline (e.g. easily implement a connector to a storage > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> system > > >>>>>>> > > >>>>>>>> that can > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> export itself to files) > > >>>>>>>>>>>>>>>> - Implement a Kafka reader by composing a "list > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> partitions" > > >>>>> > > >>>>>> DoFn > > >>>>>>>>>>> > > >>>>>>>>>>>> with a > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> DoFn that simply polls a consumer and outputs new records > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> in > > >>>>>> > > >>>>>>> a > > >>>>>>> > > >>>>>>>> while() > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> loop > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> - Implement a log tailer by composing a DoFn that > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> incrementally > > >>>>>>> > > >>>>>>>> returns > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> new > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> files in a directory and a DoFn that tails a file > > >>>>>>>>>>>>>>>> - Implement a parallel "count friends in common" > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> algorithm > > >>>>> > > >>>>>> (matrix > > >>>>>>>>>>> > > >>>>>>>>>>>> squaring) with good work balancing > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Here is the meaningful part of a hypothetical Kafka > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> reader > > >>>>> > > >>>>>> written > > >>>>>>>>>>> > > >>>>>>>>>>>> against > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> this API: > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> ProcessContinuation processElement( > > >>>>>>>>>>>>>>>> ProcessContext context, OffsetRangeTracker > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> tracker) > > >>>>>>> > > >>>>>>>> { > > >>>>>>>>>>> > > >>>>>>>>>>>> try (KafkaConsumer<String, String> consumer = > > >>>>>>>>>>>>>>>> Kafka.subscribe(context.element().topic, > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> context.element().partition)) { > > >>>>>>> > > >>>>>>>> consumer.seek(tracker.start()); > > >>>>>>>>>>>>>>>> while (true) { > > >>>>>>>>>>>>>>>> ConsumerRecords<String, String> records = > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> consumer.poll(100ms); > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> if (records == null) return done(); > > >>>>>>>>>>>>>>>> for (ConsumerRecord<String, String> record : > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> records) > > >>>>>>> > > >>>>>>>> { > > >>>>>>>>>>> > > >>>>>>>>>>>> if (!tracker.tryClaim(record.offset())) { > > >>>>>>>>>>>>>>>> return > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> resume().withFutureOutputWatermark(record.timestamp()); > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>> context.output(record); > > >>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>> } > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> The document describes in detail the motivations behind > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> this > > >>>>>> > > >>>>>>> feature, > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> the > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> basic idea and API, open questions, and outlines an > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> incremental > > >>>>>>> > > >>>>>>>> delivery > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> plan. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> The proposed API builds on the reflection-based new DoFn > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> [new-do-fn] > > >>>>>>>>>>>>> > > >>>>>>>>>>>>>> and is > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> loosely related to "State and Timers for DoFn" > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> [beam-state]. > > >>>>>> > > >>>>>>> > > >>>>>>>>>>>>>>>> Please take a look and comment! > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> Thanks. > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 > > >>>>>>>>>>>>>>>> [new-do-fn] https://s.apache.org/a-new-do-fn > > >>>>>>>>>>>>>>>> [beam-state] https://s.apache.org/beam-state > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> -- > > >>>>>>>>>>>>>>> Jean-Baptiste Onofré > > >>>>>>>>>>>>>>> jbono...@apache.org <javascript:;> > > >>>>>>>>>>>>>>> http://blog.nanthrax.net > > >>>>>>>>>>>>>>> Talend - http://www.talend.com > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> Thanks, > > >>>>>> Andrew > > >>>>>> > > >>>>>> Subscribe to my book: Streaming Data <http://manning.com/psaltis> > > >>>>>> <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > > >>>>>> twiiter: @itmdata < > > http://twitter.com/intent/user?screen_name=itmdata > > >>>>>> > > > >>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>> -- > > >>> Jean-Baptiste Onofré > > >>> jbono...@apache.org > > >>> http://blog.nanthrax.net > > >>> Talend - http://www.talend.com > > >>> > > >>> > > >> > > > -- > > > Jean-Baptiste Onofré > > > jbono...@apache.org > > > http://blog.nanthrax.net > > > Talend - http://www.talend.com > > > > > >