+1 as in I'll join ;-)

On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov <kirpic...@google.com.invalid>
wrote:

> Sounds good, thanks!
> Then Friday Aug 19th it is, 8am-9am PST,
> https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn
>
> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
> > Hi
> >
> > Unfortunately I will be in Ireland on August 15th. What about Friday
> 19th ?
> >
> > Regards
> > JB
> >
> >
> >
> > On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
> > <kirpic...@google.com.INVALID> wrote:
> > >Hi JB,
> > >
> > >Sounds great, does the suggested time over videoconference work for
> > >you?
> > >
> > >On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <j...@nanthrax.net>
> > >wrote:
> > >
> > >> Hi Eugene
> > >>
> > >> May we talk together next week ? I like the proposal. I would just
> > >need
> > >> some details for my understanding.
> > >>
> > >> Thanks
> > >> Regards
> > >> JB
> > >>
> > >>
> > >>
> > >> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> > >> <kirpic...@google.com.INVALID> wrote:
> > >> >Hi JB,
> > >> >
> > >> >What are your thoughts on this?
> > >> >
> > >> >I'm also thinking of having a virtual meeting to explain more about
> > >> >this
> > >> >proposal if necessary, since I understand it is a lot to digest.
> > >> >
> > >> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
> > >> >(link:
> > >> >
> > >>
> > >
> >
> https://staging.talkgadget.google.com/hangouts/_/google.com/splittabledofn
> > >> >-
> > >> >I confirmed that it can be joined without being logged into a Google
> > >> >account)
> > >> >
> > >> >Who'd be interested in attending, and does this time/date work for
> > >> >people?
> > >> >
> > >> >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
> > ><kirpic...@google.com>
> > >> >wrote:
> > >> >
> > >> >> Hi JB, thanks for reading and for your comments!
> > >> >>
> > >> >> It sounds like you are concerned about continued support for
> > >existing
> > >> >IO's
> > >> >> people have developed, and about backward compatibility?
> > >> >>
> > >> >> We do not need to remove the Source API, and all existing
> > >> >Source-based
> > >> >> connectors will continue to work [though the document proposes at
> > >> >some
> > >> >> point to make Read.from(Source) to translate to a wrapper SDF
> > >under
> > >> >the
> > >> >> hood, to exercise the feature more and to make sure that it is
> > >> >strictly
> > >> >> more powerful - but this is an optional implementation detail].
> > >> >>
> > >> >> Perhaps the document phrases this too strongly - "replacing the
> > >> >Source
> > >> >> API": a better phrasing would be "introducing a new API so
> > >powerful
> > >> >and
> > >> >> easy-to-use that hopefully people will choose it over the Source
> > >API
> > >> >all
> > >> >> the time, even though they don't have to" :) And we can discuss
> > >> >whether or
> > >> >> not to actually deprecate/remove the Source API at some point down
> > >> >the
> > >> >> road, once it becomes clear whether this is the case or not.
> > >> >>
> > >> >> To give more context: this proposal came out of discussions within
> > >> >the SDK
> > >> >> team over the past ~1.5 years, before the Beam project existed, on
> > >> >how to
> > >> >> make major improvements to the Source API; perhaps it will clarify
> > >> >things
> > >> >> if I give a history of the ideas discussed:
> > >> >> - The first idea was to introduce a Read.from(PCollection<Source>)
> > >> >> transform while keeping the Source API intact - this, given
> > >> >appropriate
> > >> >> implementation, would solve most of the scalability and
> > >composability
> > >> >> issues of IO's. Then most connectors would look like : ParDo<A,
> > >> >Source<B>>
> > >> >> + Read.from().
> > >> >> - Then we figured that the Source class is an unnecessary
> > >> >abstraction, as
> > >> >> it simply holds data. What if we only had a Reader<S, B> class
> > >where
> > >> >S is
> > >> >> the source type and B the output type? Then connectors would be
> > >> >something
> > >> >> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
> > >> >> - Then somebody remarked that some of the features of Source are
> > >> >useful to
> > >> >> ParDo's as well: e.g. ability to report progress when processing a
> > >> >very
> > >> >> heavy element, or ability to produce very large output in
> > >parallel.
> > >> >> - The two previous bullets were already hinting that the
> > >Read.using()
> > >> >> primitive might not be so special: it just takes S and produces B:
> > >> >isn't
> > >> >> that what a ParDo does, plus some source magic, minus the
> > >convenience
> > >> >of
> > >> >> c.output() vs. the start/advance() state machine?
> > >> >> - At this point it became clear that we should explore unifying
> > >> >sources
> > >> >> and ParDo's, in particular: can we bring the magic of sources to
> > >> >ParDo's
> > >> >> but without the limitations and coding inconveniences? And this is
> > >> >how
> > >> >> SplittableDoFn was born: bringing source magic to a DoFn by
> > >providing
> > >> >a
> > >> >> RangeTracker.
> > >> >> - Once the idea of "splittable DoFn's" was born, it became clear
> > >that
> > >> >it
> > >> >> is strictly more general than sources; at least, in the respect
> > >that
> > >> >> sources have to produce output, while DoFn's don't: an SDF may
> > >very
> > >> >well
> > >> >> produce no output at all, and simply perform a side effect in a
> > >> >> parallel/resumable way.
> > >> >> - Then there were countless hours of discussions on unifying the
> > >> >> bounded/unbounded cases, on the particulars of RangeTracker APIs
> > >> >> reconciling parallelization and checkpointing, what the relation
> > >> >between
> > >> >> SDF and DF should be, etc. They culminated in the current
> > >proposal.
> > >> >The
> > >> >> proposal comes at a time when a couple of key ingredients are
> > >> >(almost)
> > >> >> ready: NewDoFn to make SDF look like a regular DoFn, and the
> > >> >State/Timers
> > >> >> proposal to enable unbounded work per element.
> > >> >>
> > >> >> To put it shortly:
> > >> >> - Yes, we will support existing Source connectors, and will
> > >support
> > >> >> writing new ones, possibly forever. There is no interference with
> > >> >current
> > >> >> users of Source.
> > >> >> - The new API is an attempt to improve the Source API, taken to
> > >its
> > >> >> logical limit where it turns out that users' goals can be
> > >> >accomplished
> > >> >> easier and more generically entirely within ParDo's.
> > >> >>
> > >> >> Let me know what you think, and thanks again!
> > >> >>
> > >> >> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
> > ><j...@nanthrax.net>
> > >> >> wrote:
> > >> >>
> > >> >>> Hi Eugene,
> > >> >>>
> > >> >>> Just a question: why is it in DoFn and note an improvement of
> > >Source
> > >> >?
> > >> >>>
> > >> >>> If I understand correctly, it means that we will have to
> > >refactore
> > >> >all
> > >> >>> existing IO: basically, what you propose is to remove all Source
> > >to
> > >> >>> replace with NewDoFn.
> > >> >>>
> > >> >>> I'm concern with this approach, especially in term of timing:
> > >> >clearly,
> > >> >>> the IO is the area where we have to move forward in Beam as it
> > >will
> > >> >>> allow new users to start in their projects.
> > >> >>> So, we started to bring new IOs: Kafka, JMS, Cassandra, MongoDB,
> > >> >JDBC,
> > >> >>> ... and some people started to learn the IO API (Bounded/Unbouded
> > >> >>> source, etc).
> > >> >>>
> > >> >>> I think it would make more sense to enhance the IO API (Source)
> > >> >instead
> > >> >>> of introducing a NewDoFn.
> > >> >>>
> > >> >>> What are your thoughts for IO writer like me ? ;)
> > >> >>>
> > >> >>> Regards
> > >> >>> JB
> > >> >>>
> > >> >>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
> > >> >>> > Hello Beam community,
> > >> >>> >
> > >> >>> > We (myself, Daniel Mills and Robert Bradshaw) would like to
> > >> >propose
> > >> >>> > "Splittable DoFn" - a major generalization of DoFn, which
> > >allows
> > >> >>> processing
> > >> >>> > of a single element to be non-monolithic, i.e. checkpointable
> > >and
> > >> >>> > parallelizable, as well as doing an unbounded amount of work
> > >per
> > >> >>> element.
> > >> >>> >
> > >> >>> > This allows effectively replacing the current
> > >> >Bounded/UnboundedSource
> > >> >>> APIs
> > >> >>> > with DoFn's that are much easier to code, more scalable and
> > >> >composable
> > >> >>> with
> > >> >>> > the rest of the Beam programming model, and enables many use
> > >cases
> > >> >that
> > >> >>> > were previously difficult or impossible, as well as some
> > >> >non-obvious new
> > >> >>> > use cases.
> > >> >>> >
> > >> >>> > This proposal has been mentioned before in JIRA [BEAM-65] and
> > >some
> > >> >Beam
> > >> >>> > meetings, and now the whole thing is written up in a document:
> > >> >>> >
> > >> >>> >         https://s.apache.org/splittable-do-fn
> > >> >>> >
> > >> >>> > Here are some things that become possible with Splittable DoFn:
> > >> >>> > - Efficiently read a filepattern matching millions of files
> > >> >>> > - Read a collection of files that are produced by an earlier
> > >step
> > >> >in the
> > >> >>> > pipeline (e.g. easily implement a connector to a storage system
> > >> >that can
> > >> >>> > export itself to files)
> > >> >>> > - Implement a Kafka reader by composing a "list partitions"
> > >DoFn
> > >> >with a
> > >> >>> > DoFn that simply polls a consumer and outputs new records in a
> > >> >while()
> > >> >>> loop
> > >> >>> > - Implement a log tailer by composing a DoFn that incrementally
> > >> >returns
> > >> >>> new
> > >> >>> > files in a directory and a DoFn that tails a file
> > >> >>> > - Implement a parallel "count friends in common" algorithm
> > >(matrix
> > >> >>> > squaring) with good work balancing
> > >> >>> >
> > >> >>> > Here is the meaningful part of a hypothetical Kafka reader
> > >written
> > >> >>> against
> > >> >>> > this API:
> > >> >>> >
> > >> >>> >     ProcessContinuation processElement(
> > >> >>> >             ProcessContext context, OffsetRangeTracker tracker)
> > >{
> > >> >>> >       try (KafkaConsumer<String, String> consumer =
> > >> >>> >                 Kafka.subscribe(context.element().topic,
> > >> >>> >                                 context.element().partition)) {
> > >> >>> >         consumer.seek(tracker.start());
> > >> >>> >         while (true) {
> > >> >>> >           ConsumerRecords<String, String> records =
> > >> >>> consumer.poll(100ms);
> > >> >>> >           if (records == null) return done();
> > >> >>> >           for (ConsumerRecord<String, String> record : records)
> > >{
> > >> >>> >             if (!tracker.tryClaim(record.offset())) {
> > >> >>> >               return
> > >> >>> resume().withFutureOutputWatermark(record.timestamp());
> > >> >>> >             }
> > >> >>> >             context.output(record);
> > >> >>> >           }
> > >> >>> >         }
> > >> >>> >       }
> > >> >>> >     }
> > >> >>> >
> > >> >>> > The document describes in detail the motivations behind this
> > >> >feature,
> > >> >>> the
> > >> >>> > basic idea and API, open questions, and outlines an incremental
> > >> >delivery
> > >> >>> > plan.
> > >> >>> >
> > >> >>> > The proposed API builds on the reflection-based new DoFn
> > >> >[new-do-fn]
> > >> >>> and is
> > >> >>> > loosely related to "State and Timers for DoFn" [beam-state].
> > >> >>> >
> > >> >>> > Please take a look and comment!
> > >> >>> >
> > >> >>> > Thanks.
> > >> >>> >
> > >> >>> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > >> >>> > [new-do-fn] https://s.apache.org/a-new-do-fn
> > >> >>> > [beam-state] https://s.apache.org/beam-state
> > >> >>> >
> > >> >>>
> > >> >>> --
> > >> >>> Jean-Baptiste Onofré
> > >> >>> jbono...@apache.org
> > >> >>> http://blog.nanthrax.net
> > >> >>> Talend - http://www.talend.com
> > >> >>>
> > >> >>
> > >>
> >
>

Reply via email to