PST mornings are better, because they are evening/nights for me. Friday
would work-out best for me.

On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov
<[email protected]> wrote:

> Awesome!!!
>
> Amit - remind me your time zone? JB, do you want to join?
> I'm free this week all afternoons (say after 2pm) in Pacific Time, and
> mornings of Wed & Fri. We'll probably need half an hour to an hour.
>
> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <[email protected]>
> wrote:
>
> > I whipped up a quick version for Flink that seems to work:
> > https://github.com/apache/beam/pull/2235
> >
> > There are still two failing tests, as described in the PR.
> >
> > On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
> > > +1 for a video call. I think it should be pretty straight forward for
> the
> > > Spark runner after the work on read from UnboundedSource and after
> > > GroupAlsoByWindow, but from my experience such a call could move us
> > > forward
> > > fast enough.
> > >
> > > On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <[email protected]>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Let us continue working on this. I am back from various travels and
> am
> > > > eager to help.
> > > >
> > > > Amit, JB - would you like to perhaps have a videocall to hash this
> out
> > for
> > > > the Spark runner?
> > > >
> > > > Aljoscha - are the necessary Flink changes done / or is the need for
> > them
> > > > obviated by using the (existing) runner-facing state/timer APIs?
> > Should we
> > > > have a videocall too?
> > > >
> > > > Thomas - what do you think about getting this into Apex runner?
> > > >
> > > > (I think videocalls will allow to make rapid progress, but it's
> > probably a
> > > > better idea to keep them separate since they'll involve a lot of
> > > > runner-specific details)
> > > >
> > > > PS - The completion of this in Dataflow streaming runner is currently
> > > > waiting only on having a small service-side change implemented and
> > rolled
> > > > out for termination of streaming jobs.
> > > >
> > > > On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <[email protected]>
> > wrote:
> > > >
> > > > I recommend proceeding with the runner-facing state & timer APIs;
> they
> > are
> > > > lower-level and more appropriate for this. All runners provide them
> or
> > use
> > > > runners/core implementations, as they are needed for triggering.
> > > >
> > > > On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
> > [email protected]>
> > > > wrote:
> > > >
> > > > Thanks Aljoscha!
> > > >
> > > > Minor note: I'm not familiar with what level of support for timers
> > Flink
> > > > currently has - however SDF in Direct and Dataflow runner currently
> > does
> > > > not use the user-facing state/timer APIs - rather, it uses the
> > > > runner-facing APIs (StateInternals and TimerInternals) - perhaps
> Flink
> > > > already implements these. We may want to change this, but for now
> it's
> > good
> > > > enough (besides, SDF uses watermark holds, which are not supported by
> > the
> > > > user-facing state API yet).
> > > >
> > > > On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> > > > [email protected]> wrote:
> > > >
> > > > Thanks for the motivation, Eugene! :-)
> > > >
> > > > I've wanted to do this for a while now but was waiting for the Flink
> > 1.2
> > > > release (which happened this week)! There's some prerequisite work to
> > be
> > > > done on the Flink runner: we'll move to the new timer interfaces
> > introduced
> > > > in Flink 1.2 and implement support for both the user facing state and
> > timer
> > > > APIs. This should make implementation of SDF easier.
> > > >
> > > > On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <
> [email protected]
> > >
> > > > wrote:
> > > >
> > > > Thanks! Looking forward to this work.
> > > >
> > > > On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <[email protected]
> >
> > > > wrote:
> > > >
> > > > Thanks for the update Eugene.
> > > >
> > > > I will work on the spark runner with Amit.
> > > >
> > > > Regards
> > > > JB
> > > >
> > > > On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> > > > <[email protected]> wrote:
> > > > >Hello,
> > > > >
> > > > >I'm almost done adding support for Splittable DoFn
> > > > >http://s.apache.org/splittable-do-fn to Dataflow streaming runner*,
> > and
> > > > >very excited about that. There's only 1 PR
> > > > ><https://github.com/apache/beam/pull/1898> remaining, plus enabling
> > > > >some
> > > > >tests.
> > > > >
> > > > >* (batch runner is much harder because it's not yet quite clear to
> me
> > > > >how
> > > > >to properly implement liquid sharding
> > > > ><
> > > >
> >
> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
> > > > >
> > > > >with
> > > > >SDF - and the current API is not ready for that yet)
> > > > >
> > > > >After implementing all the runner-agnostic parts of Splittable
> DoFn, I
> > > > >found them quite easy to integrate into Dataflow streaming runner,
> and
> > > > >I
> > > > >think this means it should be easy to integrate into other runners
> > too.
> > > > >
> > > > >====== Why it'd be cool ======
> > > > >The general benefits of SDF are well-described in the design doc
> > > > >(linked
> > > > >above).
> > > > >As for right now - if we integrated SDF with all runners, it'd
> already
> > > > >enable us to start greatly simplifying the code of existing
> streaming
> > > > >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new
> > > > >connectors
> > > > >(e.g. a really nice one to implement would be "directory watcher",
> > that
> > > > >continuously returns new files in a directory).
> > > > >
> > > > >As a teaser, here's the complete implementation of an "unbounded
> > > > >counter" I
> > > > >used for my test of Dataflow runner integration:
> > > > >
> > > > >  class CountFn extends DoFn<String, String> {
> > > > >    @ProcessElement
> > > > >public ProcessContinuation process(ProcessContext c,
> > OffsetRangeTracker
> > > > >tracker) {
> > > > >      for (int i = tracker.currentRestriction().getFrom();
> > > > >tracker.tryClaim(i); ++i) c.output(i);
> > > > >      return resume();
> > > > >    }
> > > > >
> > > > >    @GetInitialRestriction
> > > > >    public OffsetRange getInitialRange(String element) { return new
> > > > >OffsetRange(0, Integer.MAX_VALUE); }
> > > > >
> > > > >    @NewTracker
> > > > >   public OffsetRangeTracker newTracker(OffsetRange range) { return
> > new
> > > > >OffsetRangeTracker(range); }
> > > > >  }
> > > > >
> > > > >====== What I'm asking ======
> > > > >So, I'd like to ask for help integrating SDF into Spark, Flink and
> > Apex
> > > > >runners from people who are intimately familiar with them -
> > > > >specifically, I
> > > > >was hoping best-case I could nerd-snipe some of you into taking over
> > > > >the
> > > > >integration of SDF with your favorite runner ;)
> > > > >
> > > > >The proper set of people seems to be +Aljoscha Krettek
> > > > ><[email protected]> +Maximilian Michels
> > > > ><[email protected]>
> > > > >[email protected] <[email protected]> +Amit Sela
> > > > ><[email protected]> +Thomas
> > > > >Weise unless I forgot somebody.
> > > > >
> > > > >Average-case, I was looking for runner-specific guidance on how to
> do
> > > > >it
> > > > >myself.
> > > > >
> > > > >====== If you want to help ======
> > > > >If somebody decides to take this over, in my absence (I'll be mostly
> > > > >gone
> > > > >for ~the next month)., the best people to ask for implementation
> > > > >advice are +Kenn
> > > > >Knowles <[email protected]> and +Daniel Mills <[email protected]> .
> > > > >
> > > > >For reference, here's how SDF is implemented in the direct runner:
> > > > >- Direct runner overrides
> > > > ><
> > > >
> >
> https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b74a62d9b24/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
> > > > >
> > > > > ParDo.of() for a splittable DoFn and replaces it with
> SplittableParDo
> > > > ><
> > > >
> >
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> > > > >
> > > > >(common
> > > > >transform expansion)
> > > > >- SplittableParDo uses two runner-specific primitive transforms:
> > > > >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct
> runner
> > > > >overrides the first one like this
> > > > ><
> > > >
> >
> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> > > > >,
> > > > >and directly implements evaluation of the second one like this
> > > > ><
> > > >
> >
> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> > > > >,
> > > > >using runner hooks introduced in this PR
> > > > ><https://github.com/apache/beam/pull/1824>. At the core of the
> hooks
> > is
> > > > >"ProcessFn" which is like a regular DoFn but has to be prepared at
> > > > >runtime
> > > > >with some hooks (state, timers, and runner access to
> > > > >RestrictionTracker)
> > > > >before you invoke it. I added a convenience implementation of the
> hook
> > > > >mimicking behavior of UnboundedSource.
> > > > >- The relevant runner-agnostic tests are in SplittableDoFnTest
> > > > ><
> > > >
> >
> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
> > > > >
> > > > >.
> > > > >
> > > > >That's all it takes, really - the runner has to implement these two
> > > > >transforms. When I looked at Spark and Flink runners, it was not
> quite
> > > > >clear to me how to implement the GBKIntoKeyedWorkItems transform,
> e.g.
> > > > >Spark runner currently doesn't use KeyedWorkItem at all - but it
> seems
> > > > >definitely possible.
> > > > >
> > > > >Thanks!
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin
> > > >
> > > > [email protected]
> > > > +49-(0)30-55599146 <+49%2030%2055599146> <+49%2030%2055599146>
> > > >
> > > > Registered at Amtsgericht Charlottenburg - HRB 158244 B
> > > > Managing Directors: Kostas Tzoumas, Stephan Ewen
> > > >
> > > >
> > > >
> >
>

Reply via email to