Hi JB, What are your thoughts on this?
I'm also thinking of having a virtual meeting to explain more about this proposal if necessary, since I understand it is a lot to digest. How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts? (link: https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn - I confirmed that it can be joined without being logged into a Google account) Who'd be interested in attending, and does this time/date work for people? On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov <kirpic...@google.com> wrote: > Hi JB, thanks for reading and for your comments! > > It sounds like you are concerned about continued support for existing IO's > people have developed, and about backward compatibility? > > We do not need to remove the Source API, and all existing Source-based > connectors will continue to work [though the document proposes at some > point to make Read.from(Source) to translate to a wrapper SDF under the > hood, to exercise the feature more and to make sure that it is strictly > more powerful - but this is an optional implementation detail]. > > Perhaps the document phrases this too strongly - "replacing the Source > API": a better phrasing would be "introducing a new API so powerful and > easy-to-use that hopefully people will choose it over the Source API all > the time, even though they don't have to" :) And we can discuss whether or > not to actually deprecate/remove the Source API at some point down the > road, once it becomes clear whether this is the case or not. > > To give more context: this proposal came out of discussions within the SDK > team over the past ~1.5 years, before the Beam project existed, on how to > make major improvements to the Source API; perhaps it will clarify things > if I give a history of the ideas discussed: > - The first idea was to introduce a Read.from(PCollection<Source>) > transform while keeping the Source API intact - this, given appropriate > implementation, would solve most of the scalability and composability > issues of IO's. Then most connectors would look like : ParDo<A, Source<B>> > + Read.from(). > - Then we figured that the Source class is an unnecessary abstraction, as > it simply holds data. What if we only had a Reader<S, B> class where S is > the source type and B the output type? Then connectors would be something > like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>). > - Then somebody remarked that some of the features of Source are useful to > ParDo's as well: e.g. ability to report progress when processing a very > heavy element, or ability to produce very large output in parallel. > - The two previous bullets were already hinting that the Read.using() > primitive might not be so special: it just takes S and produces B: isn't > that what a ParDo does, plus some source magic, minus the convenience of > c.output() vs. the start/advance() state machine? > - At this point it became clear that we should explore unifying sources > and ParDo's, in particular: can we bring the magic of sources to ParDo's > but without the limitations and coding inconveniences? And this is how > SplittableDoFn was born: bringing source magic to a DoFn by providing a > RangeTracker. > - Once the idea of "splittable DoFn's" was born, it became clear that it > is strictly more general than sources; at least, in the respect that > sources have to produce output, while DoFn's don't: an SDF may very well > produce no output at all, and simply perform a side effect in a > parallel/resumable way. > - Then there were countless hours of discussions on unifying the > bounded/unbounded cases, on the particulars of RangeTracker APIs > reconciling parallelization and checkpointing, what the relation between > SDF and DF should be, etc. They culminated in the current proposal. The > proposal comes at a time when a couple of key ingredients are (almost) > ready: NewDoFn to make SDF look like a regular DoFn, and the State/Timers > proposal to enable unbounded work per element. > > To put it shortly: > - Yes, we will support existing Source connectors, and will support > writing new ones, possibly forever. There is no interference with current > users of Source. > - The new API is an attempt to improve the Source API, taken to its > logical limit where it turns out that users' goals can be accomplished > easier and more generically entirely within ParDo's. > > Let me know what you think, and thanks again! > > On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> Hi Eugene, >> >> Just a question: why is it in DoFn and note an improvement of Source ? >> >> If I understand correctly, it means that we will have to refactore all >> existing IO: basically, what you propose is to remove all Source to >> replace with NewDoFn. >> >> I'm concern with this approach, especially in term of timing: clearly, >> the IO is the area where we have to move forward in Beam as it will >> allow new users to start in their projects. >> So, we started to bring new IOs: Kafka, JMS, Cassandra, MongoDB, JDBC, >> ... and some people started to learn the IO API (Bounded/Unbouded >> source, etc). >> >> I think it would make more sense to enhance the IO API (Source) instead >> of introducing a NewDoFn. >> >> What are your thoughts for IO writer like me ? ;) >> >> Regards >> JB >> >> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote: >> > Hello Beam community, >> > >> > We (myself, Daniel Mills and Robert Bradshaw) would like to propose >> > "Splittable DoFn" - a major generalization of DoFn, which allows >> processing >> > of a single element to be non-monolithic, i.e. checkpointable and >> > parallelizable, as well as doing an unbounded amount of work per >> element. >> > >> > This allows effectively replacing the current Bounded/UnboundedSource >> APIs >> > with DoFn's that are much easier to code, more scalable and composable >> with >> > the rest of the Beam programming model, and enables many use cases that >> > were previously difficult or impossible, as well as some non-obvious new >> > use cases. >> > >> > This proposal has been mentioned before in JIRA [BEAM-65] and some Beam >> > meetings, and now the whole thing is written up in a document: >> > >> > https://s.apache.org/splittable-do-fn >> > >> > Here are some things that become possible with Splittable DoFn: >> > - Efficiently read a filepattern matching millions of files >> > - Read a collection of files that are produced by an earlier step in the >> > pipeline (e.g. easily implement a connector to a storage system that can >> > export itself to files) >> > - Implement a Kafka reader by composing a "list partitions" DoFn with a >> > DoFn that simply polls a consumer and outputs new records in a while() >> loop >> > - Implement a log tailer by composing a DoFn that incrementally returns >> new >> > files in a directory and a DoFn that tails a file >> > - Implement a parallel "count friends in common" algorithm (matrix >> > squaring) with good work balancing >> > >> > Here is the meaningful part of a hypothetical Kafka reader written >> against >> > this API: >> > >> > ProcessContinuation processElement( >> > ProcessContext context, OffsetRangeTracker tracker) { >> > try (KafkaConsumer<String, String> consumer = >> > Kafka.subscribe(context.element().topic, >> > context.element().partition)) { >> > consumer.seek(tracker.start()); >> > while (true) { >> > ConsumerRecords<String, String> records = >> consumer.poll(100ms); >> > if (records == null) return done(); >> > for (ConsumerRecord<String, String> record : records) { >> > if (!tracker.tryClaim(record.offset())) { >> > return >> resume().withFutureOutputWatermark(record.timestamp()); >> > } >> > context.output(record); >> > } >> > } >> > } >> > } >> > >> > The document describes in detail the motivations behind this feature, >> the >> > basic idea and API, open questions, and outlines an incremental delivery >> > plan. >> > >> > The proposed API builds on the reflection-based new DoFn [new-do-fn] >> and is >> > loosely related to "State and Timers for DoFn" [beam-state]. >> > >> > Please take a look and comment! >> > >> > Thanks. >> > >> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65 >> > [new-do-fn] https://s.apache.org/a-new-do-fn >> > [beam-state] https://s.apache.org/beam-state >> > >> >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org >> http://blog.nanthrax.net >> Talend - http://www.talend.com >> >