Hi JB,

What are your thoughts on this?

I'm also thinking of having a virtual meeting to explain more about this
proposal if necessary, since I understand it is a lot to digest.

How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
(link:
https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn -
I confirmed that it can be joined without being logged into a Google
account)

Who'd be interested in attending, and does this time/date work for people?

On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Hi JB, thanks for reading and for your comments!
>
> It sounds like you are concerned about continued support for existing IO's
> people have developed, and about backward compatibility?
>
> We do not need to remove the Source API, and all existing Source-based
> connectors will continue to work [though the document proposes at some
> point to make Read.from(Source) to translate to a wrapper SDF under the
> hood, to exercise the feature more and to make sure that it is strictly
> more powerful - but this is an optional implementation detail].
>
> Perhaps the document phrases this too strongly - "replacing the Source
> API": a better phrasing would be "introducing a new API so powerful and
> easy-to-use that hopefully people will choose it over the Source API all
> the time, even though they don't have to" :) And we can discuss whether or
> not to actually deprecate/remove the Source API at some point down the
> road, once it becomes clear whether this is the case or not.
>
> To give more context: this proposal came out of discussions within the SDK
> team over the past ~1.5 years, before the Beam project existed, on how to
> make major improvements to the Source API; perhaps it will clarify things
> if I give a history of the ideas discussed:
> - The first idea was to introduce a Read.from(PCollection<Source>)
> transform while keeping the Source API intact - this, given appropriate
> implementation, would solve most of the scalability and composability
> issues of IO's. Then most connectors would look like : ParDo<A, Source<B>>
> + Read.from().
> - Then we figured that the Source class is an unnecessary abstraction, as
> it simply holds data. What if we only had a Reader<S, B> class where S is
> the source type and B the output type? Then connectors would be something
> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
> - Then somebody remarked that some of the features of Source are useful to
> ParDo's as well: e.g. ability to report progress when processing a very
> heavy element, or ability to produce very large output in parallel.
> - The two previous bullets were already hinting that the Read.using()
> primitive might not be so special: it just takes S and produces B: isn't
> that what a ParDo does, plus some source magic, minus the convenience of
> c.output() vs. the start/advance() state machine?
> - At this point it became clear that we should explore unifying sources
> and ParDo's, in particular: can we bring the magic of sources to ParDo's
> but without the limitations and coding inconveniences? And this is how
> SplittableDoFn was born: bringing source magic to a DoFn by providing a
> RangeTracker.
> - Once the idea of "splittable DoFn's" was born, it became clear that it
> is strictly more general than sources; at least, in the respect that
> sources have to produce output, while DoFn's don't: an SDF may very well
> produce no output at all, and simply perform a side effect in a
> parallel/resumable way.
> - Then there were countless hours of discussions on unifying the
> bounded/unbounded cases, on the particulars of RangeTracker APIs
> reconciling parallelization and checkpointing, what the relation between
> SDF and DF should be, etc. They culminated in the current proposal. The
> proposal comes at a time when a couple of key ingredients are (almost)
> ready: NewDoFn to make SDF look like a regular DoFn, and the State/Timers
> proposal to enable unbounded work per element.
>
> To put it shortly:
> - Yes, we will support existing Source connectors, and will support
> writing new ones, possibly forever. There is no interference with current
> users of Source.
> - The new API is an attempt to improve the Source API, taken to its
> logical limit where it turns out that users' goals can be accomplished
> easier and more generically entirely within ParDo's.
>
> Let me know what you think, and thanks again!
>
> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
>> Hi Eugene,
>>
>> Just a question: why is it in DoFn and note an improvement of Source ?
>>
>> If I understand correctly, it means that we will have to refactore all
>> existing IO: basically, what you propose is to remove all Source to
>> replace with NewDoFn.
>>
>> I'm concern with this approach, especially in term of timing: clearly,
>> the IO is the area where we have to move forward in Beam as it will
>> allow new users to start in their projects.
>> So, we started to bring new IOs: Kafka, JMS, Cassandra, MongoDB, JDBC,
>> ... and some people started to learn the IO API (Bounded/Unbouded
>> source, etc).
>>
>> I think it would make more sense to enhance the IO API (Source) instead
>> of introducing a NewDoFn.
>>
>> What are your thoughts for IO writer like me ? ;)
>>
>> Regards
>> JB
>>
>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
>> > Hello Beam community,
>> >
>> > We (myself, Daniel Mills and Robert Bradshaw) would like to propose
>> > "Splittable DoFn" - a major generalization of DoFn, which allows
>> processing
>> > of a single element to be non-monolithic, i.e. checkpointable and
>> > parallelizable, as well as doing an unbounded amount of work per
>> element.
>> >
>> > This allows effectively replacing the current Bounded/UnboundedSource
>> APIs
>> > with DoFn's that are much easier to code, more scalable and composable
>> with
>> > the rest of the Beam programming model, and enables many use cases that
>> > were previously difficult or impossible, as well as some non-obvious new
>> > use cases.
>> >
>> > This proposal has been mentioned before in JIRA [BEAM-65] and some Beam
>> > meetings, and now the whole thing is written up in a document:
>> >
>> >         https://s.apache.org/splittable-do-fn
>> >
>> > Here are some things that become possible with Splittable DoFn:
>> > - Efficiently read a filepattern matching millions of files
>> > - Read a collection of files that are produced by an earlier step in the
>> > pipeline (e.g. easily implement a connector to a storage system that can
>> > export itself to files)
>> > - Implement a Kafka reader by composing a "list partitions" DoFn with a
>> > DoFn that simply polls a consumer and outputs new records in a while()
>> loop
>> > - Implement a log tailer by composing a DoFn that incrementally returns
>> new
>> > files in a directory and a DoFn that tails a file
>> > - Implement a parallel "count friends in common" algorithm (matrix
>> > squaring) with good work balancing
>> >
>> > Here is the meaningful part of a hypothetical Kafka reader written
>> against
>> > this API:
>> >
>> >     ProcessContinuation processElement(
>> >             ProcessContext context, OffsetRangeTracker tracker) {
>> >       try (KafkaConsumer<String, String> consumer =
>> >                 Kafka.subscribe(context.element().topic,
>> >                                 context.element().partition)) {
>> >         consumer.seek(tracker.start());
>> >         while (true) {
>> >           ConsumerRecords<String, String> records =
>> consumer.poll(100ms);
>> >           if (records == null) return done();
>> >           for (ConsumerRecord<String, String> record : records) {
>> >             if (!tracker.tryClaim(record.offset())) {
>> >               return
>> resume().withFutureOutputWatermark(record.timestamp());
>> >             }
>> >             context.output(record);
>> >           }
>> >         }
>> >       }
>> >     }
>> >
>> > The document describes in detail the motivations behind this feature,
>> the
>> > basic idea and API, open questions, and outlines an incremental delivery
>> > plan.
>> >
>> > The proposed API builds on the reflection-based new DoFn [new-do-fn]
>> and is
>> > loosely related to "State and Timers for DoFn" [beam-state].
>> >
>> > Please take a look and comment!
>> >
>> > Thanks.
>> >
>> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
>> > [new-do-fn] https://s.apache.org/a-new-do-fn
>> > [beam-state] https://s.apache.org/beam-state
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>

Reply via email to