Hello everybody, Just a reminder:
The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to join the call go to https://hangouts.google.com/hangouts/_/google.com/splittabledofn . I intend to go over the proposed design and then have a free-form discussion. Please have a skim through the proposal doc: https://s.apache.org/ splittable-do-fn I also made some slides that are basically a trimmed-down version of the doc to use as a guide when conducting the meeting, https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing . I will post notes from the meeting on this thread afterwards. Thanks, looking forward. On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin <dhalp...@google.com.invalid> wrote: > This is pretty cool! I'll be there too. (unless the hangout gets too full > -- if so, I'll drop out in favor of others who aren't lucky enough to get > to talk to Eugene all the time.) > > On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <psaltis.and...@gmail.com> > wrote: > > > +1 I'll join > > > > On Friday, August 12, 2016, Aparup Banerjee (apbanerj) < > apban...@cisco.com > > > > > wrote: > > > > > + 1, me2 > > > > > > > > > > > > > > > On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com <javascript:;>> > > > wrote: > > > > > > >+1 as in I'll join ;-) > > > > > > > >On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov > > <kirpic...@google.com.invalid > > > > > > > >wrote: > > > > > > > >> Sounds good, thanks! > > > >> Then Friday Aug 19th it is, 8am-9am PST, > > > >> https://staging.talkgadget.google.com/hangouts/_/google. > > > com/splittabledofn > > > >> > > > >> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré < > > j...@nanthrax.net > > > <javascript:;>> > > > >> wrote: > > > >> > > > >> > Hi > > > >> > > > > >> > Unfortunately I will be in Ireland on August 15th. What about > Friday > > > >> 19th ? > > > >> > > > > >> > Regards > > > >> > JB > > > >> > > > > >> > > > > >> > > > > >> > On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov > > > >> > <kirpic...@google.com.INVALID> wrote: > > > >> > >Hi JB, > > > >> > > > > > >> > >Sounds great, does the suggested time over videoconference work > for > > > >> > >you? > > > >> > > > > > >> > >On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré < > > > j...@nanthrax.net <javascript:;>> > > > >> > >wrote: > > > >> > > > > > >> > >> Hi Eugene > > > >> > >> > > > >> > >> May we talk together next week ? I like the proposal. I would > > just > > > >> > >need > > > >> > >> some details for my understanding. > > > >> > >> > > > >> > >> Thanks > > > >> > >> Regards > > > >> > >> JB > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > >> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > > > >> > >> <kirpic...@google.com.INVALID> wrote: > > > >> > >> >Hi JB, > > > >> > >> > > > > >> > >> >What are your thoughts on this? > > > >> > >> > > > > >> > >> >I'm also thinking of having a virtual meeting to explain more > > > about > > > >> > >> >this > > > >> > >> >proposal if necessary, since I understand it is a lot to > digest. > > > >> > >> > > > > >> > >> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts? > > > >> > >> >(link: > > > >> > >> > > > > >> > >> > > > >> > > > > > >> > > > > >> https://staging.talkgadget.google.com/hangouts/_/google. > > > com/splittabledofn > > > >> > >> >- > > > >> > >> >I confirmed that it can be joined without being logged into a > > > Google > > > >> > >> >account) > > > >> > >> > > > > >> > >> >Who'd be interested in attending, and does this time/date work > > for > > > >> > >> >people? > > > >> > >> > > > > >> > >> >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov > > > >> > ><kirpic...@google.com <javascript:;>> > > > >> > >> >wrote: > > > >> > >> > > > > >> > >> >> Hi JB, thanks for reading and for your comments! > > > >> > >> >> > > > >> > >> >> It sounds like you are concerned about continued support for > > > >> > >existing > > > >> > >> >IO's > > > >> > >> >> people have developed, and about backward compatibility? > > > >> > >> >> > > > >> > >> >> We do not need to remove the Source API, and all existing > > > >> > >> >Source-based > > > >> > >> >> connectors will continue to work [though the document > proposes > > > at > > > >> > >> >some > > > >> > >> >> point to make Read.from(Source) to translate to a wrapper > SDF > > > >> > >under > > > >> > >> >the > > > >> > >> >> hood, to exercise the feature more and to make sure that it > is > > > >> > >> >strictly > > > >> > >> >> more powerful - but this is an optional implementation > > detail]. > > > >> > >> >> > > > >> > >> >> Perhaps the document phrases this too strongly - "replacing > > the > > > >> > >> >Source > > > >> > >> >> API": a better phrasing would be "introducing a new API so > > > >> > >powerful > > > >> > >> >and > > > >> > >> >> easy-to-use that hopefully people will choose it over the > > Source > > > >> > >API > > > >> > >> >all > > > >> > >> >> the time, even though they don't have to" :) And we can > > discuss > > > >> > >> >whether or > > > >> > >> >> not to actually deprecate/remove the Source API at some > point > > > down > > > >> > >> >the > > > >> > >> >> road, once it becomes clear whether this is the case or not. > > > >> > >> >> > > > >> > >> >> To give more context: this proposal came out of discussions > > > within > > > >> > >> >the SDK > > > >> > >> >> team over the past ~1.5 years, before the Beam project > > existed, > > > on > > > >> > >> >how to > > > >> > >> >> make major improvements to the Source API; perhaps it will > > > clarify > > > >> > >> >things > > > >> > >> >> if I give a history of the ideas discussed: > > > >> > >> >> - The first idea was to introduce a > > > Read.from(PCollection<Source>) > > > >> > >> >> transform while keeping the Source API intact - this, given > > > >> > >> >appropriate > > > >> > >> >> implementation, would solve most of the scalability and > > > >> > >composability > > > >> > >> >> issues of IO's. Then most connectors would look like : > > ParDo<A, > > > >> > >> >Source<B>> > > > >> > >> >> + Read.from(). > > > >> > >> >> - Then we figured that the Source class is an unnecessary > > > >> > >> >abstraction, as > > > >> > >> >> it simply holds data. What if we only had a Reader<S, B> > class > > > >> > >where > > > >> > >> >S is > > > >> > >> >> the source type and B the output type? Then connectors would > > be > > > >> > >> >something > > > >> > >> >> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>). > > > >> > >> >> - Then somebody remarked that some of the features of Source > > are > > > >> > >> >useful to > > > >> > >> >> ParDo's as well: e.g. ability to report progress when > > > processing a > > > >> > >> >very > > > >> > >> >> heavy element, or ability to produce very large output in > > > >> > >parallel. > > > >> > >> >> - The two previous bullets were already hinting that the > > > >> > >Read.using() > > > >> > >> >> primitive might not be so special: it just takes S and > > produces > > > B: > > > >> > >> >isn't > > > >> > >> >> that what a ParDo does, plus some source magic, minus the > > > >> > >convenience > > > >> > >> >of > > > >> > >> >> c.output() vs. the start/advance() state machine? > > > >> > >> >> - At this point it became clear that we should explore > > unifying > > > >> > >> >sources > > > >> > >> >> and ParDo's, in particular: can we bring the magic of > sources > > to > > > >> > >> >ParDo's > > > >> > >> >> but without the limitations and coding inconveniences? And > > this > > > is > > > >> > >> >how > > > >> > >> >> SplittableDoFn was born: bringing source magic to a DoFn by > > > >> > >providing > > > >> > >> >a > > > >> > >> >> RangeTracker. > > > >> > >> >> - Once the idea of "splittable DoFn's" was born, it became > > clear > > > >> > >that > > > >> > >> >it > > > >> > >> >> is strictly more general than sources; at least, in the > > respect > > > >> > >that > > > >> > >> >> sources have to produce output, while DoFn's don't: an SDF > may > > > >> > >very > > > >> > >> >well > > > >> > >> >> produce no output at all, and simply perform a side effect > in > > a > > > >> > >> >> parallel/resumable way. > > > >> > >> >> - Then there were countless hours of discussions on unifying > > the > > > >> > >> >> bounded/unbounded cases, on the particulars of RangeTracker > > APIs > > > >> > >> >> reconciling parallelization and checkpointing, what the > > relation > > > >> > >> >between > > > >> > >> >> SDF and DF should be, etc. They culminated in the current > > > >> > >proposal. > > > >> > >> >The > > > >> > >> >> proposal comes at a time when a couple of key ingredients > are > > > >> > >> >(almost) > > > >> > >> >> ready: NewDoFn to make SDF look like a regular DoFn, and the > > > >> > >> >State/Timers > > > >> > >> >> proposal to enable unbounded work per element. > > > >> > >> >> > > > >> > >> >> To put it shortly: > > > >> > >> >> - Yes, we will support existing Source connectors, and will > > > >> > >support > > > >> > >> >> writing new ones, possibly forever. There is no interference > > > with > > > >> > >> >current > > > >> > >> >> users of Source. > > > >> > >> >> - The new API is an attempt to improve the Source API, taken > > to > > > >> > >its > > > >> > >> >> logical limit where it turns out that users' goals can be > > > >> > >> >accomplished > > > >> > >> >> easier and more generically entirely within ParDo's. > > > >> > >> >> > > > >> > >> >> Let me know what you think, and thanks again! > > > >> > >> >> > > > >> > >> >> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré > > > >> > ><j...@nanthrax.net <javascript:;>> > > > >> > >> >> wrote: > > > >> > >> >> > > > >> > >> >>> Hi Eugene, > > > >> > >> >>> > > > >> > >> >>> Just a question: why is it in DoFn and note an improvement > of > > > >> > >Source > > > >> > >> >? > > > >> > >> >>> > > > >> > >> >>> If I understand correctly, it means that we will have to > > > >> > >refactore > > > >> > >> >all > > > >> > >> >>> existing IO: basically, what you propose is to remove all > > > Source > > > >> > >to > > > >> > >> >>> replace with NewDoFn. > > > >> > >> >>> > > > >> > >> >>> I'm concern with this approach, especially in term of > timing: > > > >> > >> >clearly, > > > >> > >> >>> the IO is the area where we have to move forward in Beam as > > it > > > >> > >will > > > >> > >> >>> allow new users to start in their projects. > > > >> > >> >>> So, we started to bring new IOs: Kafka, JMS, Cassandra, > > > MongoDB, > > > >> > >> >JDBC, > > > >> > >> >>> ... and some people started to learn the IO API > > > (Bounded/Unbouded > > > >> > >> >>> source, etc). > > > >> > >> >>> > > > >> > >> >>> I think it would make more sense to enhance the IO API > > (Source) > > > >> > >> >instead > > > >> > >> >>> of introducing a NewDoFn. > > > >> > >> >>> > > > >> > >> >>> What are your thoughts for IO writer like me ? ;) > > > >> > >> >>> > > > >> > >> >>> Regards > > > >> > >> >>> JB > > > >> > >> >>> > > > >> > >> >>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > > > >> > >> >>> > Hello Beam community, > > > >> > >> >>> > > > > >> > >> >>> > We (myself, Daniel Mills and Robert Bradshaw) would like > to > > > >> > >> >propose > > > >> > >> >>> > "Splittable DoFn" - a major generalization of DoFn, which > > > >> > >allows > > > >> > >> >>> processing > > > >> > >> >>> > of a single element to be non-monolithic, i.e. > > checkpointable > > > >> > >and > > > >> > >> >>> > parallelizable, as well as doing an unbounded amount of > > work > > > >> > >per > > > >> > >> >>> element. > > > >> > >> >>> > > > > >> > >> >>> > This allows effectively replacing the current > > > >> > >> >Bounded/UnboundedSource > > > >> > >> >>> APIs > > > >> > >> >>> > with DoFn's that are much easier to code, more scalable > and > > > >> > >> >composable > > > >> > >> >>> with > > > >> > >> >>> > the rest of the Beam programming model, and enables many > > use > > > >> > >cases > > > >> > >> >that > > > >> > >> >>> > were previously difficult or impossible, as well as some > > > >> > >> >non-obvious new > > > >> > >> >>> > use cases. > > > >> > >> >>> > > > > >> > >> >>> > This proposal has been mentioned before in JIRA [BEAM-65] > > and > > > >> > >some > > > >> > >> >Beam > > > >> > >> >>> > meetings, and now the whole thing is written up in a > > > document: > > > >> > >> >>> > > > > >> > >> >>> > https://s.apache.org/splittable-do-fn > > > >> > >> >>> > > > > >> > >> >>> > Here are some things that become possible with Splittable > > > DoFn: > > > >> > >> >>> > - Efficiently read a filepattern matching millions of > files > > > >> > >> >>> > - Read a collection of files that are produced by an > > earlier > > > >> > >step > > > >> > >> >in the > > > >> > >> >>> > pipeline (e.g. easily implement a connector to a storage > > > system > > > >> > >> >that can > > > >> > >> >>> > export itself to files) > > > >> > >> >>> > - Implement a Kafka reader by composing a "list > partitions" > > > >> > >DoFn > > > >> > >> >with a > > > >> > >> >>> > DoFn that simply polls a consumer and outputs new records > > in > > > a > > > >> > >> >while() > > > >> > >> >>> loop > > > >> > >> >>> > - Implement a log tailer by composing a DoFn that > > > incrementally > > > >> > >> >returns > > > >> > >> >>> new > > > >> > >> >>> > files in a directory and a DoFn that tails a file > > > >> > >> >>> > - Implement a parallel "count friends in common" > algorithm > > > >> > >(matrix > > > >> > >> >>> > squaring) with good work balancing > > > >> > >> >>> > > > > >> > >> >>> > Here is the meaningful part of a hypothetical Kafka > reader > > > >> > >written > > > >> > >> >>> against > > > >> > >> >>> > this API: > > > >> > >> >>> > > > > >> > >> >>> > ProcessContinuation processElement( > > > >> > >> >>> > ProcessContext context, OffsetRangeTracker > > > tracker) > > > >> > >{ > > > >> > >> >>> > try (KafkaConsumer<String, String> consumer = > > > >> > >> >>> > Kafka.subscribe(context.element().topic, > > > >> > >> >>> > > > > context.element().partition)) { > > > >> > >> >>> > consumer.seek(tracker.start()); > > > >> > >> >>> > while (true) { > > > >> > >> >>> > ConsumerRecords<String, String> records = > > > >> > >> >>> consumer.poll(100ms); > > > >> > >> >>> > if (records == null) return done(); > > > >> > >> >>> > for (ConsumerRecord<String, String> record : > > > records) > > > >> > >{ > > > >> > >> >>> > if (!tracker.tryClaim(record.offset())) { > > > >> > >> >>> > return > > > >> > >> >>> resume().withFutureOutputWatermark(record.timestamp()); > > > >> > >> >>> > } > > > >> > >> >>> > context.output(record); > > > >> > >> >>> > } > > > >> > >> >>> > } > > > >> > >> >>> > } > > > >> > >> >>> > } > > > >> > >> >>> > > > > >> > >> >>> > The document describes in detail the motivations behind > > this > > > >> > >> >feature, > > > >> > >> >>> the > > > >> > >> >>> > basic idea and API, open questions, and outlines an > > > incremental > > > >> > >> >delivery > > > >> > >> >>> > plan. > > > >> > >> >>> > > > > >> > >> >>> > The proposed API builds on the reflection-based new DoFn > > > >> > >> >[new-do-fn] > > > >> > >> >>> and is > > > >> > >> >>> > loosely related to "State and Timers for DoFn" > > [beam-state]. > > > >> > >> >>> > > > > >> > >> >>> > Please take a look and comment! > > > >> > >> >>> > > > > >> > >> >>> > Thanks. > > > >> > >> >>> > > > > >> > >> >>> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 > > > >> > >> >>> > [new-do-fn] https://s.apache.org/a-new-do-fn > > > >> > >> >>> > [beam-state] https://s.apache.org/beam-state > > > >> > >> >>> > > > > >> > >> >>> > > > >> > >> >>> -- > > > >> > >> >>> Jean-Baptiste Onofré > > > >> > >> >>> jbono...@apache.org <javascript:;> > > > >> > >> >>> http://blog.nanthrax.net > > > >> > >> >>> Talend - http://www.talend.com > > > >> > >> >>> > > > >> > >> >> > > > >> > >> > > > >> > > > > >> > > > > > > > > > -- > > Thanks, > > Andrew > > > > Subscribe to my book: Streaming Data <http://manning.com/psaltis> > > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306> > > twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata> > > >