Awesome!!!

Amit - remind me your time zone? JB, do you want to join?
I'm free this week all afternoons (say after 2pm) in Pacific Time, and
mornings of Wed & Fri. We'll probably need half an hour to an hour.

On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <[email protected]>
wrote:

> I whipped up a quick version for Flink that seems to work:
> https://github.com/apache/beam/pull/2235
>
> There are still two failing tests, as described in the PR.
>
> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
> > +1 for a video call. I think it should be pretty straight forward for the
> > Spark runner after the work on read from UnboundedSource and after
> > GroupAlsoByWindow, but from my experience such a call could move us
> > forward
> > fast enough.
> >
> > On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <[email protected]>
> > wrote:
> >
> > > Hi all,
> > >
> > > Let us continue working on this. I am back from various travels and am
> > > eager to help.
> > >
> > > Amit, JB - would you like to perhaps have a videocall to hash this out
> for
> > > the Spark runner?
> > >
> > > Aljoscha - are the necessary Flink changes done / or is the need for
> them
> > > obviated by using the (existing) runner-facing state/timer APIs?
> Should we
> > > have a videocall too?
> > >
> > > Thomas - what do you think about getting this into Apex runner?
> > >
> > > (I think videocalls will allow to make rapid progress, but it's
> probably a
> > > better idea to keep them separate since they'll involve a lot of
> > > runner-specific details)
> > >
> > > PS - The completion of this in Dataflow streaming runner is currently
> > > waiting only on having a small service-side change implemented and
> rolled
> > > out for termination of streaming jobs.
> > >
> > > On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <[email protected]>
> wrote:
> > >
> > > I recommend proceeding with the runner-facing state & timer APIs; they
> are
> > > lower-level and more appropriate for this. All runners provide them or
> use
> > > runners/core implementations, as they are needed for triggering.
> > >
> > > On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
> [email protected]>
> > > wrote:
> > >
> > > Thanks Aljoscha!
> > >
> > > Minor note: I'm not familiar with what level of support for timers
> Flink
> > > currently has - however SDF in Direct and Dataflow runner currently
> does
> > > not use the user-facing state/timer APIs - rather, it uses the
> > > runner-facing APIs (StateInternals and TimerInternals) - perhaps Flink
> > > already implements these. We may want to change this, but for now it's
> good
> > > enough (besides, SDF uses watermark holds, which are not supported by
> the
> > > user-facing state API yet).
> > >
> > > On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> > > [email protected]> wrote:
> > >
> > > Thanks for the motivation, Eugene! :-)
> > >
> > > I've wanted to do this for a while now but was waiting for the Flink
> 1.2
> > > release (which happened this week)! There's some prerequisite work to
> be
> > > done on the Flink runner: we'll move to the new timer interfaces
> introduced
> > > in Flink 1.2 and implement support for both the user facing state and
> timer
> > > APIs. This should make implementation of SDF easier.
> > >
> > > On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <[email protected]
> >
> > > wrote:
> > >
> > > Thanks! Looking forward to this work.
> > >
> > > On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <[email protected]>
> > > wrote:
> > >
> > > Thanks for the update Eugene.
> > >
> > > I will work on the spark runner with Amit.
> > >
> > > Regards
> > > JB
> > >
> > > On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> > > <[email protected]> wrote:
> > > >Hello,
> > > >
> > > >I'm almost done adding support for Splittable DoFn
> > > >http://s.apache.org/splittable-do-fn to Dataflow streaming runner*,
> and
> > > >very excited about that. There's only 1 PR
> > > ><https://github.com/apache/beam/pull/1898> remaining, plus enabling
> > > >some
> > > >tests.
> > > >
> > > >* (batch runner is much harder because it's not yet quite clear to me
> > > >how
> > > >to properly implement liquid sharding
> > > ><
> > >
> https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
> > > >
> > > >with
> > > >SDF - and the current API is not ready for that yet)
> > > >
> > > >After implementing all the runner-agnostic parts of Splittable DoFn, I
> > > >found them quite easy to integrate into Dataflow streaming runner, and
> > > >I
> > > >think this means it should be easy to integrate into other runners
> too.
> > > >
> > > >====== Why it'd be cool ======
> > > >The general benefits of SDF are well-described in the design doc
> > > >(linked
> > > >above).
> > > >As for right now - if we integrated SDF with all runners, it'd already
> > > >enable us to start greatly simplifying the code of existing streaming
> > > >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new
> > > >connectors
> > > >(e.g. a really nice one to implement would be "directory watcher",
> that
> > > >continuously returns new files in a directory).
> > > >
> > > >As a teaser, here's the complete implementation of an "unbounded
> > > >counter" I
> > > >used for my test of Dataflow runner integration:
> > > >
> > > >  class CountFn extends DoFn<String, String> {
> > > >    @ProcessElement
> > > >public ProcessContinuation process(ProcessContext c,
> OffsetRangeTracker
> > > >tracker) {
> > > >      for (int i = tracker.currentRestriction().getFrom();
> > > >tracker.tryClaim(i); ++i) c.output(i);
> > > >      return resume();
> > > >    }
> > > >
> > > >    @GetInitialRestriction
> > > >    public OffsetRange getInitialRange(String element) { return new
> > > >OffsetRange(0, Integer.MAX_VALUE); }
> > > >
> > > >    @NewTracker
> > > >   public OffsetRangeTracker newTracker(OffsetRange range) { return
> new
> > > >OffsetRangeTracker(range); }
> > > >  }
> > > >
> > > >====== What I'm asking ======
> > > >So, I'd like to ask for help integrating SDF into Spark, Flink and
> Apex
> > > >runners from people who are intimately familiar with them -
> > > >specifically, I
> > > >was hoping best-case I could nerd-snipe some of you into taking over
> > > >the
> > > >integration of SDF with your favorite runner ;)
> > > >
> > > >The proper set of people seems to be +Aljoscha Krettek
> > > ><[email protected]> +Maximilian Michels
> > > ><[email protected]>
> > > >[email protected] <[email protected]> +Amit Sela
> > > ><[email protected]> +Thomas
> > > >Weise unless I forgot somebody.
> > > >
> > > >Average-case, I was looking for runner-specific guidance on how to do
> > > >it
> > > >myself.
> > > >
> > > >====== If you want to help ======
> > > >If somebody decides to take this over, in my absence (I'll be mostly
> > > >gone
> > > >for ~the next month)., the best people to ask for implementation
> > > >advice are +Kenn
> > > >Knowles <[email protected]> and +Daniel Mills <[email protected]> .
> > > >
> > > >For reference, here's how SDF is implemented in the direct runner:
> > > >- Direct runner overrides
> > > ><
> > >
> https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b74a62d9b24/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
> > > >
> > > > ParDo.of() for a splittable DoFn and replaces it with SplittableParDo
> > > ><
> > >
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> > > >
> > > >(common
> > > >transform expansion)
> > > >- SplittableParDo uses two runner-specific primitive transforms:
> > > >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner
> > > >overrides the first one like this
> > > ><
> > >
> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> > > >,
> > > >and directly implements evaluation of the second one like this
> > > ><
> > >
> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> > > >,
> > > >using runner hooks introduced in this PR
> > > ><https://github.com/apache/beam/pull/1824>. At the core of the hooks
> is
> > > >"ProcessFn" which is like a regular DoFn but has to be prepared at
> > > >runtime
> > > >with some hooks (state, timers, and runner access to
> > > >RestrictionTracker)
> > > >before you invoke it. I added a convenience implementation of the hook
> > > >mimicking behavior of UnboundedSource.
> > > >- The relevant runner-agnostic tests are in SplittableDoFnTest
> > > ><
> > >
> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
> > > >
> > > >.
> > > >
> > > >That's all it takes, really - the runner has to implement these two
> > > >transforms. When I looked at Spark and Flink runners, it was not quite
> > > >clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g.
> > > >Spark runner currently doesn't use KeyedWorkItem at all - but it seems
> > > >definitely possible.
> > > >
> > > >Thanks!
> > >
> > >
> > >
> > >
> > > --
> > > Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin
> > >
> > > [email protected]
> > > +49-(0)30-55599146 <+49%2030%2055599146>
> > >
> > > Registered at Amtsgericht Charlottenburg - HRB 158244 B
> > > Managing Directors: Kostas Tzoumas, Stephan Ewen
> > >
> > >
> > >
>

Reply via email to