Hi Eugene

May we talk together next week ? I like the proposal. I would just need some 
details for my understanding.

Thanks
Regards
JB



On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov 
<kirpic...@google.com.INVALID> wrote:
>Hi JB,
>
>What are your thoughts on this?
>
>I'm also thinking of having a virtual meeting to explain more about
>this
>proposal if necessary, since I understand it is a lot to digest.
>
>How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
>(link:
>https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn
>-
>I confirmed that it can be joined without being logged into a Google
>account)
>
>Who'd be interested in attending, and does this time/date work for
>people?
>
>On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov <kirpic...@google.com>
>wrote:
>
>> Hi JB, thanks for reading and for your comments!
>>
>> It sounds like you are concerned about continued support for existing
>IO's
>> people have developed, and about backward compatibility?
>>
>> We do not need to remove the Source API, and all existing
>Source-based
>> connectors will continue to work [though the document proposes at
>some
>> point to make Read.from(Source) to translate to a wrapper SDF under
>the
>> hood, to exercise the feature more and to make sure that it is
>strictly
>> more powerful - but this is an optional implementation detail].
>>
>> Perhaps the document phrases this too strongly - "replacing the
>Source
>> API": a better phrasing would be "introducing a new API so powerful
>and
>> easy-to-use that hopefully people will choose it over the Source API
>all
>> the time, even though they don't have to" :) And we can discuss
>whether or
>> not to actually deprecate/remove the Source API at some point down
>the
>> road, once it becomes clear whether this is the case or not.
>>
>> To give more context: this proposal came out of discussions within
>the SDK
>> team over the past ~1.5 years, before the Beam project existed, on
>how to
>> make major improvements to the Source API; perhaps it will clarify
>things
>> if I give a history of the ideas discussed:
>> - The first idea was to introduce a Read.from(PCollection<Source>)
>> transform while keeping the Source API intact - this, given
>appropriate
>> implementation, would solve most of the scalability and composability
>> issues of IO's. Then most connectors would look like : ParDo<A,
>Source<B>>
>> + Read.from().
>> - Then we figured that the Source class is an unnecessary
>abstraction, as
>> it simply holds data. What if we only had a Reader<S, B> class where
>S is
>> the source type and B the output type? Then connectors would be
>something
>> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
>> - Then somebody remarked that some of the features of Source are
>useful to
>> ParDo's as well: e.g. ability to report progress when processing a
>very
>> heavy element, or ability to produce very large output in parallel.
>> - The two previous bullets were already hinting that the Read.using()
>> primitive might not be so special: it just takes S and produces B:
>isn't
>> that what a ParDo does, plus some source magic, minus the convenience
>of
>> c.output() vs. the start/advance() state machine?
>> - At this point it became clear that we should explore unifying
>sources
>> and ParDo's, in particular: can we bring the magic of sources to
>ParDo's
>> but without the limitations and coding inconveniences? And this is
>how
>> SplittableDoFn was born: bringing source magic to a DoFn by providing
>a
>> RangeTracker.
>> - Once the idea of "splittable DoFn's" was born, it became clear that
>it
>> is strictly more general than sources; at least, in the respect that
>> sources have to produce output, while DoFn's don't: an SDF may very
>well
>> produce no output at all, and simply perform a side effect in a
>> parallel/resumable way.
>> - Then there were countless hours of discussions on unifying the
>> bounded/unbounded cases, on the particulars of RangeTracker APIs
>> reconciling parallelization and checkpointing, what the relation
>between
>> SDF and DF should be, etc. They culminated in the current proposal.
>The
>> proposal comes at a time when a couple of key ingredients are
>(almost)
>> ready: NewDoFn to make SDF look like a regular DoFn, and the
>State/Timers
>> proposal to enable unbounded work per element.
>>
>> To put it shortly:
>> - Yes, we will support existing Source connectors, and will support
>> writing new ones, possibly forever. There is no interference with
>current
>> users of Source.
>> - The new API is an attempt to improve the Source API, taken to its
>> logical limit where it turns out that users' goals can be
>accomplished
>> easier and more generically entirely within ParDo's.
>>
>> Let me know what you think, and thanks again!
>>
>> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré <j...@nanthrax.net>
>> wrote:
>>
>>> Hi Eugene,
>>>
>>> Just a question: why is it in DoFn and note an improvement of Source
>?
>>>
>>> If I understand correctly, it means that we will have to refactore
>all
>>> existing IO: basically, what you propose is to remove all Source to
>>> replace with NewDoFn.
>>>
>>> I'm concern with this approach, especially in term of timing:
>clearly,
>>> the IO is the area where we have to move forward in Beam as it will
>>> allow new users to start in their projects.
>>> So, we started to bring new IOs: Kafka, JMS, Cassandra, MongoDB,
>JDBC,
>>> ... and some people started to learn the IO API (Bounded/Unbouded
>>> source, etc).
>>>
>>> I think it would make more sense to enhance the IO API (Source)
>instead
>>> of introducing a NewDoFn.
>>>
>>> What are your thoughts for IO writer like me ? ;)
>>>
>>> Regards
>>> JB
>>>
>>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
>>> > Hello Beam community,
>>> >
>>> > We (myself, Daniel Mills and Robert Bradshaw) would like to
>propose
>>> > "Splittable DoFn" - a major generalization of DoFn, which allows
>>> processing
>>> > of a single element to be non-monolithic, i.e. checkpointable and
>>> > parallelizable, as well as doing an unbounded amount of work per
>>> element.
>>> >
>>> > This allows effectively replacing the current
>Bounded/UnboundedSource
>>> APIs
>>> > with DoFn's that are much easier to code, more scalable and
>composable
>>> with
>>> > the rest of the Beam programming model, and enables many use cases
>that
>>> > were previously difficult or impossible, as well as some
>non-obvious new
>>> > use cases.
>>> >
>>> > This proposal has been mentioned before in JIRA [BEAM-65] and some
>Beam
>>> > meetings, and now the whole thing is written up in a document:
>>> >
>>> >         https://s.apache.org/splittable-do-fn
>>> >
>>> > Here are some things that become possible with Splittable DoFn:
>>> > - Efficiently read a filepattern matching millions of files
>>> > - Read a collection of files that are produced by an earlier step
>in the
>>> > pipeline (e.g. easily implement a connector to a storage system
>that can
>>> > export itself to files)
>>> > - Implement a Kafka reader by composing a "list partitions" DoFn
>with a
>>> > DoFn that simply polls a consumer and outputs new records in a
>while()
>>> loop
>>> > - Implement a log tailer by composing a DoFn that incrementally
>returns
>>> new
>>> > files in a directory and a DoFn that tails a file
>>> > - Implement a parallel "count friends in common" algorithm (matrix
>>> > squaring) with good work balancing
>>> >
>>> > Here is the meaningful part of a hypothetical Kafka reader written
>>> against
>>> > this API:
>>> >
>>> >     ProcessContinuation processElement(
>>> >             ProcessContext context, OffsetRangeTracker tracker) {
>>> >       try (KafkaConsumer<String, String> consumer =
>>> >                 Kafka.subscribe(context.element().topic,
>>> >                                 context.element().partition)) {
>>> >         consumer.seek(tracker.start());
>>> >         while (true) {
>>> >           ConsumerRecords<String, String> records =
>>> consumer.poll(100ms);
>>> >           if (records == null) return done();
>>> >           for (ConsumerRecord<String, String> record : records) {
>>> >             if (!tracker.tryClaim(record.offset())) {
>>> >               return
>>> resume().withFutureOutputWatermark(record.timestamp());
>>> >             }
>>> >             context.output(record);
>>> >           }
>>> >         }
>>> >       }
>>> >     }
>>> >
>>> > The document describes in detail the motivations behind this
>feature,
>>> the
>>> > basic idea and API, open questions, and outlines an incremental
>delivery
>>> > plan.
>>> >
>>> > The proposed API builds on the reflection-based new DoFn
>[new-do-fn]
>>> and is
>>> > loosely related to "State and Timers for DoFn" [beam-state].
>>> >
>>> > Please take a look and comment!
>>> >
>>> > Thanks.
>>> >
>>> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
>>> > [new-do-fn] https://s.apache.org/a-new-do-fn
>>> > [beam-state] https://s.apache.org/beam-state
>>> >
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> jbono...@apache.org
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>

Reply via email to