Hello everybody,

Just a reminder:

The meeting is happening tomorrow - Friday Aug 19th, 8am-9am PST, to join
the call go to
https://hangouts.google.com/hangouts/_/google.com/splittabledofn .
I intend to go over the proposed design and then have a free-form
discussion.

Please have a skim through the proposal doc: https://s.apache.org/
splittable-do-fn
I also made some slides that are basically a trimmed-down version of the
doc to use as a guide when conducting the meeting,
https://docs.google.com/presentation/d/1DfCp9VRLH-5Ap5nbz047Fiv3NfjrywJVSdef5dRY4aE/edit?usp=sharing
.

I will post notes from the meeting on this thread afterwards.

Thanks, looking forward.

On Fri, Aug 12, 2016 at 5:35 PM Dan Halperin <dhalp...@google.com.invalid>
wrote:

> This is pretty cool! I'll be there too. (unless the hangout gets too full
> -- if so, I'll drop out in favor of others who aren't lucky enough to get
> to talk to Eugene all the time.)
>
> On Fri, Aug 12, 2016 at 4:03 PM, Andrew Psaltis <psaltis.and...@gmail.com>
> wrote:
>
> > +1 I'll join
> >
> > On Friday, August 12, 2016, Aparup Banerjee (apbanerj) <
> apban...@cisco.com
> > >
> > wrote:
> >
> > > + 1, me2
> > >
> > >
> > >
> > >
> > > On 8/12/16, 9:27 AM, "Amit Sela" <amitsel...@gmail.com <javascript:;>>
> > > wrote:
> > >
> > > >+1 as in I'll join ;-)
> > > >
> > > >On Fri, Aug 12, 2016, 19:14 Eugene Kirpichov
> > <kirpic...@google.com.invalid
> > > >
> > > >wrote:
> > > >
> > > >> Sounds good, thanks!
> > > >> Then Friday Aug 19th it is, 8am-9am PST,
> > > >> https://staging.talkgadget.google.com/hangouts/_/google.
> > > com/splittabledofn
> > > >>
> > > >> On Thu, Aug 11, 2016 at 11:12 PM Jean-Baptiste Onofré <
> > j...@nanthrax.net
> > > <javascript:;>>
> > > >> wrote:
> > > >>
> > > >> > Hi
> > > >> >
> > > >> > Unfortunately I will be in Ireland on August 15th. What about
> Friday
> > > >> 19th ?
> > > >> >
> > > >> > Regards
> > > >> > JB
> > > >> >
> > > >> >
> > > >> >
> > > >> > On Aug 11, 2016, 23:22, at 23:22, Eugene Kirpichov
> > > >> > <kirpic...@google.com.INVALID> wrote:
> > > >> > >Hi JB,
> > > >> > >
> > > >> > >Sounds great, does the suggested time over videoconference work
> for
> > > >> > >you?
> > > >> > >
> > > >> > >On Thu, Aug 11, 2016 at 11:59 AM Jean-Baptiste Onofré <
> > > j...@nanthrax.net <javascript:;>>
> > > >> > >wrote:
> > > >> > >
> > > >> > >> Hi Eugene
> > > >> > >>
> > > >> > >> May we talk together next week ? I like the proposal. I would
> > just
> > > >> > >need
> > > >> > >> some details for my understanding.
> > > >> > >>
> > > >> > >> Thanks
> > > >> > >> Regards
> > > >> > >> JB
> > > >> > >>
> > > >> > >>
> > > >> > >>
> > > >> > >> On Aug 11, 2016, 19:46, at 19:46, Eugene Kirpichov
> > > >> > >> <kirpic...@google.com.INVALID> wrote:
> > > >> > >> >Hi JB,
> > > >> > >> >
> > > >> > >> >What are your thoughts on this?
> > > >> > >> >
> > > >> > >> >I'm also thinking of having a virtual meeting to explain more
> > > about
> > > >> > >> >this
> > > >> > >> >proposal if necessary, since I understand it is a lot to
> digest.
> > > >> > >> >
> > > >> > >> >How about: Monday Aug 15, 8am-9am Pacific time, over Hangouts?
> > > >> > >> >(link:
> > > >> > >> >
> > > >> > >>
> > > >> > >
> > > >> >
> > > >> https://staging.talkgadget.google.com/hangouts/_/google.
> > > com/splittabledofn
> > > >> > >> >-
> > > >> > >> >I confirmed that it can be joined without being logged into a
> > > Google
> > > >> > >> >account)
> > > >> > >> >
> > > >> > >> >Who'd be interested in attending, and does this time/date work
> > for
> > > >> > >> >people?
> > > >> > >> >
> > > >> > >> >On Fri, Aug 5, 2016 at 10:48 AM Eugene Kirpichov
> > > >> > ><kirpic...@google.com <javascript:;>>
> > > >> > >> >wrote:
> > > >> > >> >
> > > >> > >> >> Hi JB, thanks for reading and for your comments!
> > > >> > >> >>
> > > >> > >> >> It sounds like you are concerned about continued support for
> > > >> > >existing
> > > >> > >> >IO's
> > > >> > >> >> people have developed, and about backward compatibility?
> > > >> > >> >>
> > > >> > >> >> We do not need to remove the Source API, and all existing
> > > >> > >> >Source-based
> > > >> > >> >> connectors will continue to work [though the document
> proposes
> > > at
> > > >> > >> >some
> > > >> > >> >> point to make Read.from(Source) to translate to a wrapper
> SDF
> > > >> > >under
> > > >> > >> >the
> > > >> > >> >> hood, to exercise the feature more and to make sure that it
> is
> > > >> > >> >strictly
> > > >> > >> >> more powerful - but this is an optional implementation
> > detail].
> > > >> > >> >>
> > > >> > >> >> Perhaps the document phrases this too strongly - "replacing
> > the
> > > >> > >> >Source
> > > >> > >> >> API": a better phrasing would be "introducing a new API so
> > > >> > >powerful
> > > >> > >> >and
> > > >> > >> >> easy-to-use that hopefully people will choose it over the
> > Source
> > > >> > >API
> > > >> > >> >all
> > > >> > >> >> the time, even though they don't have to" :) And we can
> > discuss
> > > >> > >> >whether or
> > > >> > >> >> not to actually deprecate/remove the Source API at some
> point
> > > down
> > > >> > >> >the
> > > >> > >> >> road, once it becomes clear whether this is the case or not.
> > > >> > >> >>
> > > >> > >> >> To give more context: this proposal came out of discussions
> > > within
> > > >> > >> >the SDK
> > > >> > >> >> team over the past ~1.5 years, before the Beam project
> > existed,
> > > on
> > > >> > >> >how to
> > > >> > >> >> make major improvements to the Source API; perhaps it will
> > > clarify
> > > >> > >> >things
> > > >> > >> >> if I give a history of the ideas discussed:
> > > >> > >> >> - The first idea was to introduce a
> > > Read.from(PCollection<Source>)
> > > >> > >> >> transform while keeping the Source API intact - this, given
> > > >> > >> >appropriate
> > > >> > >> >> implementation, would solve most of the scalability and
> > > >> > >composability
> > > >> > >> >> issues of IO's. Then most connectors would look like :
> > ParDo<A,
> > > >> > >> >Source<B>>
> > > >> > >> >> + Read.from().
> > > >> > >> >> - Then we figured that the Source class is an unnecessary
> > > >> > >> >abstraction, as
> > > >> > >> >> it simply holds data. What if we only had a Reader<S, B>
> class
> > > >> > >where
> > > >> > >> >S is
> > > >> > >> >> the source type and B the output type? Then connectors would
> > be
> > > >> > >> >something
> > > >> > >> >> like: ParDo<A, S> + hypothetical Read.using(Reader<S, B>).
> > > >> > >> >> - Then somebody remarked that some of the features of Source
> > are
> > > >> > >> >useful to
> > > >> > >> >> ParDo's as well: e.g. ability to report progress when
> > > processing a
> > > >> > >> >very
> > > >> > >> >> heavy element, or ability to produce very large output in
> > > >> > >parallel.
> > > >> > >> >> - The two previous bullets were already hinting that the
> > > >> > >Read.using()
> > > >> > >> >> primitive might not be so special: it just takes S and
> > produces
> > > B:
> > > >> > >> >isn't
> > > >> > >> >> that what a ParDo does, plus some source magic, minus the
> > > >> > >convenience
> > > >> > >> >of
> > > >> > >> >> c.output() vs. the start/advance() state machine?
> > > >> > >> >> - At this point it became clear that we should explore
> > unifying
> > > >> > >> >sources
> > > >> > >> >> and ParDo's, in particular: can we bring the magic of
> sources
> > to
> > > >> > >> >ParDo's
> > > >> > >> >> but without the limitations and coding inconveniences? And
> > this
> > > is
> > > >> > >> >how
> > > >> > >> >> SplittableDoFn was born: bringing source magic to a DoFn by
> > > >> > >providing
> > > >> > >> >a
> > > >> > >> >> RangeTracker.
> > > >> > >> >> - Once the idea of "splittable DoFn's" was born, it became
> > clear
> > > >> > >that
> > > >> > >> >it
> > > >> > >> >> is strictly more general than sources; at least, in the
> > respect
> > > >> > >that
> > > >> > >> >> sources have to produce output, while DoFn's don't: an SDF
> may
> > > >> > >very
> > > >> > >> >well
> > > >> > >> >> produce no output at all, and simply perform a side effect
> in
> > a
> > > >> > >> >> parallel/resumable way.
> > > >> > >> >> - Then there were countless hours of discussions on unifying
> > the
> > > >> > >> >> bounded/unbounded cases, on the particulars of RangeTracker
> > APIs
> > > >> > >> >> reconciling parallelization and checkpointing, what the
> > relation
> > > >> > >> >between
> > > >> > >> >> SDF and DF should be, etc. They culminated in the current
> > > >> > >proposal.
> > > >> > >> >The
> > > >> > >> >> proposal comes at a time when a couple of key ingredients
> are
> > > >> > >> >(almost)
> > > >> > >> >> ready: NewDoFn to make SDF look like a regular DoFn, and the
> > > >> > >> >State/Timers
> > > >> > >> >> proposal to enable unbounded work per element.
> > > >> > >> >>
> > > >> > >> >> To put it shortly:
> > > >> > >> >> - Yes, we will support existing Source connectors, and will
> > > >> > >support
> > > >> > >> >> writing new ones, possibly forever. There is no interference
> > > with
> > > >> > >> >current
> > > >> > >> >> users of Source.
> > > >> > >> >> - The new API is an attempt to improve the Source API, taken
> > to
> > > >> > >its
> > > >> > >> >> logical limit where it turns out that users' goals can be
> > > >> > >> >accomplished
> > > >> > >> >> easier and more generically entirely within ParDo's.
> > > >> > >> >>
> > > >> > >> >> Let me know what you think, and thanks again!
> > > >> > >> >>
> > > >> > >> >> On Fri, Aug 5, 2016 at 2:39 AM Jean-Baptiste Onofré
> > > >> > ><j...@nanthrax.net <javascript:;>>
> > > >> > >> >> wrote:
> > > >> > >> >>
> > > >> > >> >>> Hi Eugene,
> > > >> > >> >>>
> > > >> > >> >>> Just a question: why is it in DoFn and note an improvement
> of
> > > >> > >Source
> > > >> > >> >?
> > > >> > >> >>>
> > > >> > >> >>> If I understand correctly, it means that we will have to
> > > >> > >refactore
> > > >> > >> >all
> > > >> > >> >>> existing IO: basically, what you propose is to remove all
> > > Source
> > > >> > >to
> > > >> > >> >>> replace with NewDoFn.
> > > >> > >> >>>
> > > >> > >> >>> I'm concern with this approach, especially in term of
> timing:
> > > >> > >> >clearly,
> > > >> > >> >>> the IO is the area where we have to move forward in Beam as
> > it
> > > >> > >will
> > > >> > >> >>> allow new users to start in their projects.
> > > >> > >> >>> So, we started to bring new IOs: Kafka, JMS, Cassandra,
> > > MongoDB,
> > > >> > >> >JDBC,
> > > >> > >> >>> ... and some people started to learn the IO API
> > > (Bounded/Unbouded
> > > >> > >> >>> source, etc).
> > > >> > >> >>>
> > > >> > >> >>> I think it would make more sense to enhance the IO API
> > (Source)
> > > >> > >> >instead
> > > >> > >> >>> of introducing a NewDoFn.
> > > >> > >> >>>
> > > >> > >> >>> What are your thoughts for IO writer like me ? ;)
> > > >> > >> >>>
> > > >> > >> >>> Regards
> > > >> > >> >>> JB
> > > >> > >> >>>
> > > >> > >> >>> On 08/04/2016 07:45 PM, Eugene Kirpichov wrote:
> > > >> > >> >>> > Hello Beam community,
> > > >> > >> >>> >
> > > >> > >> >>> > We (myself, Daniel Mills and Robert Bradshaw) would like
> to
> > > >> > >> >propose
> > > >> > >> >>> > "Splittable DoFn" - a major generalization of DoFn, which
> > > >> > >allows
> > > >> > >> >>> processing
> > > >> > >> >>> > of a single element to be non-monolithic, i.e.
> > checkpointable
> > > >> > >and
> > > >> > >> >>> > parallelizable, as well as doing an unbounded amount of
> > work
> > > >> > >per
> > > >> > >> >>> element.
> > > >> > >> >>> >
> > > >> > >> >>> > This allows effectively replacing the current
> > > >> > >> >Bounded/UnboundedSource
> > > >> > >> >>> APIs
> > > >> > >> >>> > with DoFn's that are much easier to code, more scalable
> and
> > > >> > >> >composable
> > > >> > >> >>> with
> > > >> > >> >>> > the rest of the Beam programming model, and enables many
> > use
> > > >> > >cases
> > > >> > >> >that
> > > >> > >> >>> > were previously difficult or impossible, as well as some
> > > >> > >> >non-obvious new
> > > >> > >> >>> > use cases.
> > > >> > >> >>> >
> > > >> > >> >>> > This proposal has been mentioned before in JIRA [BEAM-65]
> > and
> > > >> > >some
> > > >> > >> >Beam
> > > >> > >> >>> > meetings, and now the whole thing is written up in a
> > > document:
> > > >> > >> >>> >
> > > >> > >> >>> >         https://s.apache.org/splittable-do-fn
> > > >> > >> >>> >
> > > >> > >> >>> > Here are some things that become possible with Splittable
> > > DoFn:
> > > >> > >> >>> > - Efficiently read a filepattern matching millions of
> files
> > > >> > >> >>> > - Read a collection of files that are produced by an
> > earlier
> > > >> > >step
> > > >> > >> >in the
> > > >> > >> >>> > pipeline (e.g. easily implement a connector to a storage
> > > system
> > > >> > >> >that can
> > > >> > >> >>> > export itself to files)
> > > >> > >> >>> > - Implement a Kafka reader by composing a "list
> partitions"
> > > >> > >DoFn
> > > >> > >> >with a
> > > >> > >> >>> > DoFn that simply polls a consumer and outputs new records
> > in
> > > a
> > > >> > >> >while()
> > > >> > >> >>> loop
> > > >> > >> >>> > - Implement a log tailer by composing a DoFn that
> > > incrementally
> > > >> > >> >returns
> > > >> > >> >>> new
> > > >> > >> >>> > files in a directory and a DoFn that tails a file
> > > >> > >> >>> > - Implement a parallel "count friends in common"
> algorithm
> > > >> > >(matrix
> > > >> > >> >>> > squaring) with good work balancing
> > > >> > >> >>> >
> > > >> > >> >>> > Here is the meaningful part of a hypothetical Kafka
> reader
> > > >> > >written
> > > >> > >> >>> against
> > > >> > >> >>> > this API:
> > > >> > >> >>> >
> > > >> > >> >>> >     ProcessContinuation processElement(
> > > >> > >> >>> >             ProcessContext context, OffsetRangeTracker
> > > tracker)
> > > >> > >{
> > > >> > >> >>> >       try (KafkaConsumer<String, String> consumer =
> > > >> > >> >>> >                 Kafka.subscribe(context.element().topic,
> > > >> > >> >>> >
> > >  context.element().partition)) {
> > > >> > >> >>> >         consumer.seek(tracker.start());
> > > >> > >> >>> >         while (true) {
> > > >> > >> >>> >           ConsumerRecords<String, String> records =
> > > >> > >> >>> consumer.poll(100ms);
> > > >> > >> >>> >           if (records == null) return done();
> > > >> > >> >>> >           for (ConsumerRecord<String, String> record :
> > > records)
> > > >> > >{
> > > >> > >> >>> >             if (!tracker.tryClaim(record.offset())) {
> > > >> > >> >>> >               return
> > > >> > >> >>> resume().withFutureOutputWatermark(record.timestamp());
> > > >> > >> >>> >             }
> > > >> > >> >>> >             context.output(record);
> > > >> > >> >>> >           }
> > > >> > >> >>> >         }
> > > >> > >> >>> >       }
> > > >> > >> >>> >     }
> > > >> > >> >>> >
> > > >> > >> >>> > The document describes in detail the motivations behind
> > this
> > > >> > >> >feature,
> > > >> > >> >>> the
> > > >> > >> >>> > basic idea and API, open questions, and outlines an
> > > incremental
> > > >> > >> >delivery
> > > >> > >> >>> > plan.
> > > >> > >> >>> >
> > > >> > >> >>> > The proposed API builds on the reflection-based new DoFn
> > > >> > >> >[new-do-fn]
> > > >> > >> >>> and is
> > > >> > >> >>> > loosely related to "State and Timers for DoFn"
> > [beam-state].
> > > >> > >> >>> >
> > > >> > >> >>> > Please take a look and comment!
> > > >> > >> >>> >
> > > >> > >> >>> > Thanks.
> > > >> > >> >>> >
> > > >> > >> >>> > [BEAM-65] https://issues.apache.org/jira/browse/BEAM-65
> > > >> > >> >>> > [new-do-fn] https://s.apache.org/a-new-do-fn
> > > >> > >> >>> > [beam-state] https://s.apache.org/beam-state
> > > >> > >> >>> >
> > > >> > >> >>>
> > > >> > >> >>> --
> > > >> > >> >>> Jean-Baptiste Onofré
> > > >> > >> >>> jbono...@apache.org <javascript:;>
> > > >> > >> >>> http://blog.nanthrax.net
> > > >> > >> >>> Talend - http://www.talend.com
> > > >> > >> >>>
> > > >> > >> >>
> > > >> > >>
> > > >> >
> > > >>
> > >
> >
> >
> > --
> > Thanks,
> > Andrew
> >
> > Subscribe to my book: Streaming Data <http://manning.com/psaltis>
> > <https://www.linkedin.com/pub/andrew-psaltis/1/17b/306>
> > twiiter: @itmdata <http://twitter.com/intent/user?screen_name=itmdata>
> >
>

Reply via email to