Thomas - yes, 9:30 works, shall we do that?

JB - excellent! You can start experimenting already, using direct runner!

On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote:

> Hi Eugene,
>
> Thanks for the meeting notes !
>
> I will be in the next call and Ismaël also provided to me some updates.
>
> I will sync with Amit on Spark runner and start to experiment and test SDF
> on
> the JMS IO.
>
> Thanks !
> Regards
> JB
>
> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote:
> > Meeting notes from today's call with Amit, Aviem and Ismaël:
> >
> > Spark has 2 types of stateful operators; a cheap one intended for
> updating
> > elements (works with state but not with timers) and an expensive one.
> I.e.
> > there's no efficient direct counterpart to Beam's keyed state model. In
> > implementation of Beam State & Timers API, Spark runner will use the
> > cheaper one for state and the expensive one for timers. So, for SDF,
> which
> > in the runner-agnostic SplittableParDo expansion needs both state and
> > timers, we'll need the expensive one - but this should be fine since with
> > SDF the bottleneck should be in the ProcessElement call itself, not in
> > splitting/scheduling it.
> >
> > For Spark batch runner, implementing SDF might be still simpler: runner
> > will just not request any checkpointing. Hard parts about SDF/batch are
> > dynamic rebalancing and size estimation APIs - they will be refined this
> > quarter, but it's ok to initially not have them.
> >
> > Spark runner might use a different expansion of SDF not involving
> > KeyedWorkItem's (i.e. not overriding the GBKIntoKeyedWorkItems
> transform),
> > though still striving to reuse as much code as possible from the standard
> > expansion implemented in SplittableParDo, at least ProcessFn.
> >
> > Testing questions:
> > - Spark runner already implements termination on
> > watermarks-reaching-infinity properly.
> > - Q: How to test that the runner actually splits? A: The code that splits
> > is in the runner-agnostic, so a runner would have to deliberately
> sabotage
> > it in order to break it - unlikely. Also, for semantics we have
> > runner-agnostic ROS tests; but at some point will need performance tests
> > too.
> >
> > Next steps:
> > - Amit will look at the standard SplittableParDo expansion and
> > implementation in Flink and Direct runner, will write up a doc about how
> to
> > do this in Spark.
> > - Another videotalk in 2 weeks to check on progress/issues.
> >
> > Thanks all!
> >
> > On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <kirpic...@google.com>
> > wrote:
> >
> >> Yes, Monday morning works! How about also 8am PST, same Hangout link -
> >> does that work for you?
> >>
> >> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <thomas.we...@gmail.com>
> >> wrote:
> >>
> >> Eugene,
> >>
> >> I cannot make it for the call today. Would Monday morning work for you
> to
> >> discuss the Apex changes?
> >>
> >> Thanks
> >>
> >> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov <
> >> kirpic...@google.com.invalid> wrote:
> >>
> >>> Hi! Please feel free to join this call, but I think we'd be mostly
> >>> discussing how to do it in the Spark runner in particular; so we'll
> >>> probably need another call for Apex anyway.
> >>>
> >>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <t...@apache.org> wrote:
> >>>
> >>>> Hi Eugene,
> >>>>
> >>>> This would work for me also. Please let me know if you want to keep
> the
> >>>> Apex related discussion separate or want me to join this call.
> >>>>
> >>>> Thanks,
> >>>> Thomas
> >>>>
> >>>>
> >>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov <
> >>>> kirpic...@google.com.invalid> wrote:
> >>>>
> >>>>> Sure, Friday morning sounds good. How about 9am Friday PST, at
> >>> videocall
> >>>> by
> >>>>> link
> >> https://hangouts.google.com/hangouts/_/google.com/splittabledofn
> >>> ?
> >>>>>
> >>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <amitsel...@gmail.com>
> >>> wrote:
> >>>>>
> >>>>>> PST mornings are better, because they are evening/nights for me.
> >>> Friday
> >>>>>> would work-out best for me.
> >>>>>>
> >>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov
> >>>>>> <kirpic...@google.com.invalid> wrote:
> >>>>>>
> >>>>>>> Awesome!!!
> >>>>>>>
> >>>>>>> Amit - remind me your time zone? JB, do you want to join?
> >>>>>>> I'm free this week all afternoons (say after 2pm) in Pacific
> >> Time,
> >>>> and
> >>>>>>> mornings of Wed & Fri. We'll probably need half an hour to an
> >> hour.
> >>>>>>>
> >>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <
> >>>> aljos...@apache.org>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> I whipped up a quick version for Flink that seems to work:
> >>>>>>>> https://github.com/apache/beam/pull/2235
> >>>>>>>>
> >>>>>>>> There are still two failing tests, as described in the PR.
> >>>>>>>>
> >>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
> >>>>>>>>> +1 for a video call. I think it should be pretty straight
> >>> forward
> >>>>> for
> >>>>>>> the
> >>>>>>>>> Spark runner after the work on read from UnboundedSource and
> >>>> after
> >>>>>>>>> GroupAlsoByWindow, but from my experience such a call could
> >>> move
> >>>> us
> >>>>>>>>> forward
> >>>>>>>>> fast enough.
> >>>>>>>>>
> >>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <
> >>>> kirpic...@google.com
> >>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi all,
> >>>>>>>>>>
> >>>>>>>>>> Let us continue working on this. I am back from various
> >>> travels
> >>>>> and
> >>>>>>> am
> >>>>>>>>>> eager to help.
> >>>>>>>>>>
> >>>>>>>>>> Amit, JB - would you like to perhaps have a videocall to
> >> hash
> >>>>> this
> >>>>>>> out
> >>>>>>>> for
> >>>>>>>>>> the Spark runner?
> >>>>>>>>>>
> >>>>>>>>>> Aljoscha - are the necessary Flink changes done / or is the
> >>>> need
> >>>>>> for
> >>>>>>>> them
> >>>>>>>>>> obviated by using the (existing) runner-facing state/timer
> >>>> APIs?
> >>>>>>>> Should we
> >>>>>>>>>> have a videocall too?
> >>>>>>>>>>
> >>>>>>>>>> Thomas - what do you think about getting this into Apex
> >>> runner?
> >>>>>>>>>>
> >>>>>>>>>> (I think videocalls will allow to make rapid progress, but
> >>> it's
> >>>>>>>> probably a
> >>>>>>>>>> better idea to keep them separate since they'll involve a
> >> lot
> >>>> of
> >>>>>>>>>> runner-specific details)
> >>>>>>>>>>
> >>>>>>>>>> PS - The completion of this in Dataflow streaming runner is
> >>>>>> currently
> >>>>>>>>>> waiting only on having a small service-side change
> >>> implemented
> >>>>> and
> >>>>>>>> rolled
> >>>>>>>>>> out for termination of streaming jobs.
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <
> >>>> k...@google.com>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> I recommend proceeding with the runner-facing state & timer
> >>>> APIs;
> >>>>>>> they
> >>>>>>>> are
> >>>>>>>>>> lower-level and more appropriate for this. All runners
> >>> provide
> >>>>> them
> >>>>>>> or
> >>>>>>>> use
> >>>>>>>>>> runners/core implementations, as they are needed for
> >>>> triggering.
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
> >>>>>>>> kirpic...@google.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Thanks Aljoscha!
> >>>>>>>>>>
> >>>>>>>>>> Minor note: I'm not familiar with what level of support for
> >>>>> timers
> >>>>>>>> Flink
> >>>>>>>>>> currently has - however SDF in Direct and Dataflow runner
> >>>>> currently
> >>>>>>>> does
> >>>>>>>>>> not use the user-facing state/timer APIs - rather, it uses
> >>> the
> >>>>>>>>>> runner-facing APIs (StateInternals and TimerInternals) -
> >>>> perhaps
> >>>>>>> Flink
> >>>>>>>>>> already implements these. We may want to change this, but
> >> for
> >>>> now
> >>>>>>> it's
> >>>>>>>> good
> >>>>>>>>>> enough (besides, SDF uses watermark holds, which are not
> >>>>> supported
> >>>>>> by
> >>>>>>>> the
> >>>>>>>>>> user-facing state API yet).
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> >>>>>>>>>> aljos...@data-artisans.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the motivation, Eugene! :-)
> >>>>>>>>>>
> >>>>>>>>>> I've wanted to do this for a while now but was waiting for
> >>> the
> >>>>>> Flink
> >>>>>>>> 1.2
> >>>>>>>>>> release (which happened this week)! There's some
> >> prerequisite
> >>>>> work
> >>>>>> to
> >>>>>>>> be
> >>>>>>>>>> done on the Flink runner: we'll move to the new timer
> >>>> interfaces
> >>>>>>>> introduced
> >>>>>>>>>> in Flink 1.2 and implement support for both the user facing
> >>>> state
> >>>>>> and
> >>>>>>>> timer
> >>>>>>>>>> APIs. This should make implementation of SDF easier.
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <
> >>>>>>> kirpic...@google.com
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Thanks! Looking forward to this work.
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <
> >>>>>> j...@nanthrax.net
> >>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Thanks for the update Eugene.
> >>>>>>>>>>
> >>>>>>>>>> I will work on the spark runner with Amit.
> >>>>>>>>>>
> >>>>>>>>>> Regards
> >>>>>>>>>> JB
> >>>>>>>>>>
> >>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> >>>>>>>>>> <kirpic...@google.com.INVALID> wrote:
> >>>>>>>>>>> Hello,
> >>>>>>>>>>>
> >>>>>>>>>>> I'm almost done adding support for Splittable DoFn
> >>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow
> >> streaming
> >>>>>> runner*,
> >>>>>>>> and
> >>>>>>>>>>> very excited about that. There's only 1 PR
> >>>>>>>>>>> <https://github.com/apache/beam/pull/1898> remaining,
> >> plus
> >>>>>> enabling
> >>>>>>>>>>> some
> >>>>>>>>>>> tests.
> >>>>>>>>>>>
> >>>>>>>>>>> * (batch runner is much harder because it's not yet quite
> >>>> clear
> >>>>> to
> >>>>>>> me
> >>>>>>>>>>> how
> >>>>>>>>>>> to properly implement liquid sharding
> >>>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>> https://cloud.google.com/blog/big-data/2016/05/no-shard-
> >>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
> >>>>>>>>>>>
> >>>>>>>>>>> with
> >>>>>>>>>>> SDF - and the current API is not ready for that yet)
> >>>>>>>>>>>
> >>>>>>>>>>> After implementing all the runner-agnostic parts of
> >>> Splittable
> >>>>>>> DoFn, I
> >>>>>>>>>>> found them quite easy to integrate into Dataflow streaming
> >>>>> runner,
> >>>>>>> and
> >>>>>>>>>>> I
> >>>>>>>>>>> think this means it should be easy to integrate into other
> >>>>> runners
> >>>>>>>> too.
> >>>>>>>>>>>
> >>>>>>>>>>> ====== Why it'd be cool ======
> >>>>>>>>>>> The general benefits of SDF are well-described in the
> >> design
> >>>> doc
> >>>>>>>>>>> (linked
> >>>>>>>>>>> above).
> >>>>>>>>>>> As for right now - if we integrated SDF with all runners,
> >>> it'd
> >>>>>>> already
> >>>>>>>>>>> enable us to start greatly simplifying the code of
> >> existing
> >>>>>>> streaming
> >>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and writing
> >>> new
> >>>>>>>>>>> connectors
> >>>>>>>>>>> (e.g. a really nice one to implement would be "directory
> >>>>> watcher",
> >>>>>>>> that
> >>>>>>>>>>> continuously returns new files in a directory).
> >>>>>>>>>>>
> >>>>>>>>>>> As a teaser, here's the complete implementation of an
> >>>> "unbounded
> >>>>>>>>>>> counter" I
> >>>>>>>>>>> used for my test of Dataflow runner integration:
> >>>>>>>>>>>
> >>>>>>>>>>>  class CountFn extends DoFn<String, String> {
> >>>>>>>>>>>    @ProcessElement
> >>>>>>>>>>> public ProcessContinuation process(ProcessContext c,
> >>>>>>>> OffsetRangeTracker
> >>>>>>>>>>> tracker) {
> >>>>>>>>>>>      for (int i = tracker.currentRestriction().getFrom();
> >>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i);
> >>>>>>>>>>>      return resume();
> >>>>>>>>>>>    }
> >>>>>>>>>>>
> >>>>>>>>>>>    @GetInitialRestriction
> >>>>>>>>>>>    public OffsetRange getInitialRange(String element) {
> >>>> return
> >>>>>> new
> >>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); }
> >>>>>>>>>>>
> >>>>>>>>>>>    @NewTracker
> >>>>>>>>>>>   public OffsetRangeTracker newTracker(OffsetRange
> >> range) {
> >>>>>> return
> >>>>>>>> new
> >>>>>>>>>>> OffsetRangeTracker(range); }
> >>>>>>>>>>>  }
> >>>>>>>>>>>
> >>>>>>>>>>> ====== What I'm asking ======
> >>>>>>>>>>> So, I'd like to ask for help integrating SDF into Spark,
> >>> Flink
> >>>>> and
> >>>>>>>> Apex
> >>>>>>>>>>> runners from people who are intimately familiar with them
> >> -
> >>>>>>>>>>> specifically, I
> >>>>>>>>>>> was hoping best-case I could nerd-snipe some of you into
> >>>> taking
> >>>>>> over
> >>>>>>>>>>> the
> >>>>>>>>>>> integration of SDF with your favorite runner ;)
> >>>>>>>>>>>
> >>>>>>>>>>> The proper set of people seems to be +Aljoscha Krettek
> >>>>>>>>>>> <aljos...@data-artisans.com> +Maximilian Michels
> >>>>>>>>>>> <m...@data-artisans.com>
> >>>>>>>>>>> +ieme...@gmail.com <ieme...@gmail.com> +Amit Sela
> >>>>>>>>>>> <amitsel...@gmail.com> +Thomas
> >>>>>>>>>>> Weise unless I forgot somebody.
> >>>>>>>>>>>
> >>>>>>>>>>> Average-case, I was looking for runner-specific guidance
> >> on
> >>>> how
> >>>>> to
> >>>>>>> do
> >>>>>>>>>>> it
> >>>>>>>>>>> myself.
> >>>>>>>>>>>
> >>>>>>>>>>> ====== If you want to help ======
> >>>>>>>>>>> If somebody decides to take this over, in my absence (I'll
> >>> be
> >>>>>> mostly
> >>>>>>>>>>> gone
> >>>>>>>>>>> for ~the next month)., the best people to ask for
> >>>> implementation
> >>>>>>>>>>> advice are +Kenn
> >>>>>>>>>>> Knowles <k...@google.com> and +Daniel Mills <
> >>> mil...@google.com
> >>>>>
> >>>>> .
> >>>>>>>>>>>
> >>>>>>>>>>> For reference, here's how SDF is implemented in the direct
> >>>>> runner:
> >>>>>>>>>>> - Direct runner overrides
> >>>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>> https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b
> >>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/
> >>>>> beam/runners/direct/ParDoMultiOverrideFactory.java
> >>>>>>>>>>>
> >>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it with
> >>>>>>> SplittableParDo
> >>>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>> https://github.com/apache/beam/blob/master/runners/core-
> >>>>> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> >>>>>>>>>>>
> >>>>>>>>>>> (common
> >>>>>>>>>>> transform expansion)
> >>>>>>>>>>> - SplittableParDo uses two runner-specific primitive
> >>>> transforms:
> >>>>>>>>>>> "GBKIntoKeyedWorkItems" and "SplittableProcessElements".
> >>>> Direct
> >>>>>>> runner
> >>>>>>>>>>> overrides the first one like this
> >>>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> >>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> >>>>> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> >>>>>>>>>>> ,
> >>>>>>>>>>> and directly implements evaluation of the second one like
> >>> this
> >>>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> >>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> >>>>> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> >>>>>>>>>>> ,
> >>>>>>>>>>> using runner hooks introduced in this PR
> >>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the core
> >> of
> >>>> the
> >>>>>>> hooks
> >>>>>>>> is
> >>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to be
> >>>> prepared
> >>>>> at
> >>>>>>>>>>> runtime
> >>>>>>>>>>> with some hooks (state, timers, and runner access to
> >>>>>>>>>>> RestrictionTracker)
> >>>>>>>>>>> before you invoke it. I added a convenience implementation
> >>> of
> >>>>> the
> >>>>>>> hook
> >>>>>>>>>>> mimicking behavior of UnboundedSource.
> >>>>>>>>>>> - The relevant runner-agnostic tests are in
> >>> SplittableDoFnTest
> >>>>>>>>>>> <
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> >>>>> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/
> >>>>> transforms/SplittableDoFnTest.java
> >>>>>>>>>>>
> >>>>>>>>>>> .
> >>>>>>>>>>>
> >>>>>>>>>>> That's all it takes, really - the runner has to implement
> >>>> these
> >>>>>> two
> >>>>>>>>>>> transforms. When I looked at Spark and Flink runners, it
> >> was
> >>>> not
> >>>>>>> quite
> >>>>>>>>>>> clear to me how to implement the GBKIntoKeyedWorkItems
> >>>>> transform,
> >>>>>>> e.g.
> >>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at all -
> >>> but
> >>>> it
> >>>>>>> seems
> >>>>>>>>>>> definitely possible.
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks!
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin
> >>>>>>>>>>
> >>>>>>>>>> i...@data-artisans.com
> >>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146> <+49%2030%2055599146>
> >> <+49%2030%2055599146>
> >>>> <+49%2030%2055599146> <+49%2030%2055599146>
> >>>>>> <+49%2030%2055599146>
> >>>>>>>>>>
> >>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB 158244 B
> >>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to