Hi JB, Sounds great, does the suggested time over videoconference work for you?
On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Eugene > > May we talk together next week ? I like the proposal. I would just need > some details for my understanding. > > Thanks > Regards > JB > > > > On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov > <kirpic...@google.com.INVALID> wrote: > >Hi JB, > > > >What are your thoughts on this? > > > >I'm also thinking of having a virtual meeting to explain more about > >this > >proposal if necessary, since I understand it is a lot to digest. > > > >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts? > >(link: > > > https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn > >- > >I confirmed that it can be joined without being logged into a Google > >account) > > > >Who'd be interested in attending, and does this time/date work for > >people? > > > >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov <kirpic...@google.com> > >wrote: > > > >> Hi JB, thanks for reading and for your comments! > >> > >> It sounds like you are concerned about continued support for existing > >IO's > >> people have developed, and about backward compatibility? > >> > >> We do not need to remove the Source API, and all existing > >Source-based > >> connectors will continue to work [though the document proposes at > >some > >> point to make Read.from(Source) to translate to a wrapper SDF under > >the > >> hood, to exercise the feature more and to make sure that it is > >strictly > >> more powerful - but this is an optional implementation detail]. > >> > >> Perhaps the document phrases this too strongly - "replacing the > >Source > >> API": a better phrasing would be "introducing a new API so powerful > >and > >> easy-to-use that hopefully people will choose it over the Source API > >all > >> the time, even though they don't have to" :) And we can discuss > >whether or > >> not to actually deprecate/remove the Source API at some point down > >the > >> road, once it becomes clear whether this is the case or not. > >> > >> To give more context: this proposal came out of discussions within > >the SDK > >> team over the past ~1.5 years, before the Beam project existed, on > >how to > >> make major improvements to the Source API; perhaps it will clarify > >things > >> if I give a history of the ideas discussed: > >> - The first idea was to introduce a Read.from(PCollection<Source>) > >> transform while keeping the Source API intact - this, given > >appropriate > >> implementation, would solve most of the scalability and composability > >> issues of IO's. Then most connectors would look like : ParDo<A, > >Source<B>> > >> + Read.from(). > >> - Then we figured that the Source class is an unnecessary > >abstraction, as > >> it simply holds data. What if we only had a Reader<S, B> class where > >S is > >> the source type and B the output type? Then connectors would be > >something > >> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>). > >> - Then somebody remarked that some of the features of Source are > >useful to > >> ParDo's as well: e.g. ability to report progress when processing a > >very > >> heavy element, or ability to produce very large output in parallel. > >> - The two previous bullets were already hinting that the Read.using() > >> primitive might not be so special: it just takes S and produces B: > >isn't > >> that what a ParDo does, plus some source magic, minus the convenience > >of > >> c.output() vs. the start/advance() state machine? > >> - At this point it became clear that we should explore unifying > >sources > >> and ParDo's, in particular: can we bring the magic of sources to > >ParDo's > >> but without the limitations and coding inconveniences? And this is > >how > >> SplittableDoFn was born: bringing source magic to a DoFn by providing > >a > >> RangeTracker. > >> - Once the idea of "splittable DoFn's" was born, it became clear that > >it > >> is strictly more general than sources; at least, in the respect that > >> sources have to produce output, while DoFn's don't: an SDF may very > >well > >> produce no output at all, and simply perform a side effect in a > >> parallel/resumable way. > >> - Then there were countless hours of discussions on unifying the > >> bounded/unbounded cases, on the particulars of RangeTracker APIs > >> reconciling parallelization and checkpointing, what the relation > >between > >> SDF and DF should be, etc. They culminated in the current proposal. > >The > >> proposal comes at a time when a couple of key ingredients are > >(almost) > >> ready: NewDoFn to make SDF look like a regular DoFn, and the > >State/Timers > >> proposal to enable unbounded work per element. > >> > >> To put it shortly: > >> - Yes, we will support existing Source connectors, and will support > >> writing new ones, possibly forever. There is no interference with > >current > >> users of Source. > >> - The new API is an attempt to improve the Source API, taken to its > >> logical limit where it turns out that users' goals can be > >accomplished > >> easier and more generically entirely within ParDo's. > >> > >> Let me know what you think, and thanks again! > >> > >> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré <j...@nanthrax.net> > >> wrote: > >> > >>> Hi Eugene, > >>> > >>> Just a question: why is it in DoFn and note an improvement of Source > >? > >>> > >>> If I understand correctly, it means that we will have to refactore > >all > >>> existing IO: basically, what you propose is to remove all Source to > >>> replace with NewDoFn. > >>> > >>> I'm concern with this approach, especially in term of timing: > >clearly, > >>> the IO is the area where we have to move forward in Beam as it will > >>> allow new users to start in their projects. > >>> So, we started to bring new IOs: Kafka, JMS, Cassandra, MongoDB, > >JDBC, > >>> ... and some people started to learn the IO API (Bounded/Unbouded > >>> source, etc). > >>> > >>> I think it would make more sense to enhance the IO API (Source) > >instead > >>> of introducing a NewDoFn. > >>> > >>> What are your thoughts for IO writer like me ? ;) > >>> > >>> Regards > >>> JB > >>> > >>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: > >>> > Hello Beam community, > >>> > > >>> > We (myself, Daniel Mills and Robert Bradshaw) would like to > >propose > >>> > "Splittable DoFn" - a major generalization of DoFn, which allows > >>> processing > >>> > of a single element to be non-monolithic, i.e. checkpointable and > >>> > parallelizable, as well as doing an unbounded amount of work per > >>> element. > >>> > > >>> > This allows effectively replacing the current > >Bounded/UnboundedSource > >>> APIs > >>> > with DoFn's that are much easier to code, more scalable and > >composable > >>> with > >>> > the rest of the Beam programming model, and enables many use cases > >that > >>> > were previously difficult or impossible, as well as some > >non-obvious new > >>> > use cases. > >>> > > >>> > This proposal has been mentioned before in JIRA [BEAM-65] and some > >Beam > >>> > meetings, and now the whole thing is written up in a document: > >>> > > >>> > https://s.apache.org/splittable-do-fn > >>> > > >>> > Here are some things that become possible with Splittable DoFn: > >>> > - Efficiently read a filepattern matching millions of files > >>> > - Read a collection of files that are produced by an earlier step > >in the > >>> > pipeline (e.g. easily implement a connector to a storage system > >that can > >>> > export itself to files) > >>> > - Implement a Kafka reader by composing a "list partitions" DoFn > >with a > >>> > DoFn that simply polls a consumer and outputs new records in a > >while() > >>> loop > >>> > - Implement a log tailer by composing a DoFn that incrementally > >returns > >>> new > >>> > files in a directory and a DoFn that tails a file > >>> > - Implement a parallel "count friends in common" algorithm (matrix > >>> > squaring) with good work balancing > >>> > > >>> > Here is the meaningful part of a hypothetical Kafka reader written > >>> against > >>> > this API: > >>> > > >>> > ProcessContinuation processElement( > >>> > ProcessContext context, OffsetRangeTracker tracker) { > >>> > try (KafkaConsumer<String, String> consumer = > >>> > Kafka.subscribe(context.element().topic, > >>> > context.element().partition)) { > >>> > consumer.seek(tracker.start()); > >>> > while (true) { > >>> > ConsumerRecords<String, String> records = > >>> consumer.poll(100ms); > >>> > if (records == null) return done(); > >>> > for (ConsumerRecord<String, String> record : records) { > >>> > if (!tracker.tryClaim(record.offset())) { > >>> > return > >>> resume().withFutureOutputWatermark(record.timestamp()); > >>> > } > >>> > context.output(record); > >>> > } > >>> > } > >>> > } > >>> > } > >>> > > >>> > The document describes in detail the motivations behind this > >feature, > >>> the > >>> > basic idea and API, open questions, and outlines an incremental > >delivery > >>> > plan. > >>> > > >>> > The proposed API builds on the reflection-based new DoFn > >[new-do-fn] > >>> and is > >>> > loosely related to "State and Timers for DoFn" [beam-state]. > >>> > > >>> > Please take a look and comment! > >>> > > >>> > Thanks. > >>> > > >>> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 > >>> > [new-do-fn] https://s.apache.org/a-new-do-fn > >>> > [beam-state] https://s.apache.org/beam-state > >>> > > >>> > >>> -- > >>> Jean-Baptiste Onofré > >>> jbono...@apache.org > >>> http://blog.nanthrax.net > >>> Talend - http://www.talend.com > >>> > >> >