Hi JB,

Sounds great, does the suggested time over videoconference work for you?

On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Eugene
>
> May we talk together next week ? I like the proposal. I would just need
> some details for my understanding.
>
> Thanks
> Regards
> JB
>
>
>
> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> <kirpic...@google.com.INVALID> wrote:
> >Hi JB,
> >
> >What are your thoughts on this?
> >
> >I'm also thinking of having a virtual meeting to explain more about
> >this
> >proposal if necessary, since I understand it is a lot to digest.
> >
> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
> >(link:
> >
> https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn
> >-
> >I confirmed that it can be joined without being logged into a Google
> >account)
> >
> >Who'd be interested in attending, and does this time/date work for
> >people?
> >
> >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov <kirpic...@google.com>
> >wrote:
> >
> >> Hi JB, thanks for reading and for your comments!
> >>
> >> It sounds like you are concerned about continued support for existing
> >IO's
> >> people have developed, and about backward compatibility?
> >>
> >> We do not need to remove the Source API, and all existing
> >Source-based
> >> connectors will continue to work [though the document proposes at
> >some
> >> point to make Read.from(Source) to translate to a wrapper SDF under
> >the
> >> hood, to exercise the feature more and to make sure that it is
> >strictly
> >> more powerful - but this is an optional implementation detail].
> >>
> >> Perhaps the document phrases this too strongly - "replacing the
> >Source
> >> API": a better phrasing would be "introducing a new API so powerful
> >and
> >> easy-to-use that hopefully people will choose it over the Source API
> >all
> >> the time, even though they don't have to" :) And we can discuss
> >whether or
> >> not to actually deprecate/remove the Source API at some point down
> >the
> >> road, once it becomes clear whether this is the case or not.
> >>
> >> To give more context: this proposal came out of discussions within
> >the SDK
> >> team over the past ~1.5 years, before the Beam project existed, on
> >how to
> >> make major improvements to the Source API; perhaps it will clarify
> >things
> >> if I give a history of the ideas discussed:
> >> - The first idea was to introduce a Read.from(PCollection<Source>)
> >> transform while keeping the Source API intact - this, given
> >appropriate
> >> implementation, would solve most of the scalability and composability
> >> issues of IO's. Then most connectors would look like : ParDo<A,
> >Source<B>>
> >> + Read.from().
> >> - Then we figured that the Source class is an unnecessary
> >abstraction, as
> >> it simply holds data. What if we only had a Reader<S, B> class where
> >S is
> >> the source type and B the output type? Then connectors would be
> >something
> >> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
> >> - Then somebody remarked that some of the features of Source are
> >useful to
> >> ParDo's as well: e.g. ability to report progress when processing a
> >very
> >> heavy element, or ability to produce very large output in parallel.
> >> - The two previous bullets were already hinting that the Read.using()
> >> primitive might not be so special: it just takes S and produces B:
> >isn't
> >> that what a ParDo does, plus some source magic, minus the convenience
> >of
> >> c.output() vs. the start/advance() state machine?
> >> - At this point it became clear that we should explore unifying
> >sources
> >> and ParDo's, in particular: can we bring the magic of sources to
> >ParDo's
> >> but without the limitations and coding inconveniences? And this is
> >how
> >> SplittableDoFn was born: bringing source magic to a DoFn by providing
> >a
> >> RangeTracker.
> >> - Once the idea of "splittable DoFn's" was born, it became clear that
> >it
> >> is strictly more general than sources; at least, in the respect that
> >> sources have to produce output, while DoFn's don't: an SDF may very
> >well
> >> produce no output at all, and simply perform a side effect in a
> >> parallel/resumable way.
> >> - Then there were countless hours of discussions on unifying the
> >> bounded/unbounded cases, on the particulars of RangeTracker APIs
> >> reconciling parallelization and checkpointing, what the relation
> >between
> >> SDF and DF should be, etc. They culminated in the current proposal.
> >The
> >> proposal comes at a time when a couple of key ingredients are
> >(almost)
> >> ready: NewDoFn to make SDF look like a regular DoFn, and the
> >State/Timers
> >> proposal to enable unbounded work per element.
> >>
> >> To put it shortly:
> >> - Yes, we will support existing Source connectors, and will support
> >> writing new ones, possibly forever. There is no interference with
> >current
> >> users of Source.
> >> - The new API is an attempt to improve the Source API, taken to its
> >> logical limit where it turns out that users' goals can be
> >accomplished
> >> easier and more generically entirely within ParDo's.
> >>
> >> Let me know what you think, and thanks again!
> >>
> >> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> >> wrote:
> >>
> >>> Hi Eugene,
> >>>
> >>> Just a question: why is it in DoFn and note an improvement of Source
> >?
> >>>
> >>> If I understand correctly, it means that we will have to refactore
> >all
> >>> existing IO: basically, what you propose is to remove all Source to
> >>> replace with NewDoFn.
> >>>
> >>> I'm concern with this approach, especially in term of timing:
> >clearly,
> >>> the IO is the area where we have to move forward in Beam as it will
> >>> allow new users to start in their projects.
> >>> So, we started to bring new IOs: Kafka, JMS, Cassandra, MongoDB,
> >JDBC,
> >>> ... and some people started to learn the IO API (Bounded/Unbouded
> >>> source, etc).
> >>>
> >>> I think it would make more sense to enhance the IO API (Source)
> >instead
> >>> of introducing a NewDoFn.
> >>>
> >>> What are your thoughts for IO writer like me ? ;)
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
> >>> > Hello Beam community,
> >>> >
> >>> > We (myself, Daniel Mills and Robert Bradshaw) would like to
> >propose
> >>> > "Splittable DoFn" - a major generalization of DoFn, which allows
> >>> processing
> >>> > of a single element to be non-monolithic, i.e. checkpointable and
> >>> > parallelizable, as well as doing an unbounded amount of work per
> >>> element.
> >>> >
> >>> > This allows effectively replacing the current
> >Bounded/UnboundedSource
> >>> APIs
> >>> > with DoFn's that are much easier to code, more scalable and
> >composable
> >>> with
> >>> > the rest of the Beam programming model, and enables many use cases
> >that
> >>> > were previously difficult or impossible, as well as some
> >non-obvious new
> >>> > use cases.
> >>> >
> >>> > This proposal has been mentioned before in JIRA [BEAM-65] and some
> >Beam
> >>> > meetings, and now the whole thing is written up in a document:
> >>> >
> >>> >         https://s.apache.org/splittable-do-fn
> >>> >
> >>> > Here are some things that become possible with Splittable DoFn:
> >>> > - Efficiently read a filepattern matching millions of files
> >>> > - Read a collection of files that are produced by an earlier step
> >in the
> >>> > pipeline (e.g. easily implement a connector to a storage system
> >that can
> >>> > export itself to files)
> >>> > - Implement a Kafka reader by composing a "list partitions" DoFn
> >with a
> >>> > DoFn that simply polls a consumer and outputs new records in a
> >while()
> >>> loop
> >>> > - Implement a log tailer by composing a DoFn that incrementally
> >returns
> >>> new
> >>> > files in a directory and a DoFn that tails a file
> >>> > - Implement a parallel "count friends in common" algorithm (matrix
> >>> > squaring) with good work balancing
> >>> >
> >>> > Here is the meaningful part of a hypothetical Kafka reader written
> >>> against
> >>> > this API:
> >>> >
> >>> >     ProcessContinuation processElement(
> >>> >             ProcessContext context, OffsetRangeTracker tracker) {
> >>> >       try (KafkaConsumer<String, String> consumer =
> >>> >                 Kafka.subscribe(context.element().topic,
> >>> >                                 context.element().partition)) {
> >>> >         consumer.seek(tracker.start());
> >>> >         while (true) {
> >>> >           ConsumerRecords<String, String> records =
> >>> consumer.poll(100ms);
> >>> >           if (records == null) return done();
> >>> >           for (ConsumerRecord<String, String> record : records) {
> >>> >             if (!tracker.tryClaim(record.offset())) {
> >>> >               return
> >>> resume().withFutureOutputWatermark(record.timestamp());
> >>> >             }
> >>> >             context.output(record);
> >>> >           }
> >>> >         }
> >>> >       }
> >>> >     }
> >>> >
> >>> > The document describes in detail the motivations behind this
> >feature,
> >>> the
> >>> > basic idea and API, open questions, and outlines an incremental
> >delivery
> >>> > plan.
> >>> >
> >>> > The proposed API builds on the reflection-based new DoFn
> >[new-do-fn]
> >>> and is
> >>> > loosely related to "State and Timers for DoFn" [beam-state].
> >>> >
> >>> > Please take a look and comment!
> >>> >
> >>> > Thanks.
> >>> >
> >>> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> >>> > [new-do-fn] https://s.apache.org/a-new-do-fn
> >>> > [beam-state] https://s.apache.org/beam-state
> >>> >
> >>>
> >>> --
> >>> Jean-Baptiste Onofré
> >>> jbono...@apache.org
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
> >>>
> >>
>

Reply via email to