I whipped up a quick version for Flink that seems to work:
https://github.com/apache/beam/pull/2235

There are still two failing tests, as described in the PR. 

On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
> +1 for a video call. I think it should be pretty straight forward for the
> Spark runner after the work on read from UnboundedSource and after
> GroupAlsoByWindow, but from my experience such a call could move us
> forward
> fast enough.
> 
> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <[email protected]>
> wrote:
> 
> > Hi all,
> >
> > Let us continue working on this. I am back from various travels and am
> > eager to help.
> >
> > Amit, JB - would you like to perhaps have a videocall to hash this out for
> > the Spark runner?
> >
> > Aljoscha - are the necessary Flink changes done / or is the need for them
> > obviated by using the (existing) runner-facing state/timer APIs? Should we
> > have a videocall too?
> >
> > Thomas - what do you think about getting this into Apex runner?
> >
> > (I think videocalls will allow to make rapid progress, but it's probably a
> > better idea to keep them separate since they'll involve a lot of
> > runner-specific details)
> >
> > PS - The completion of this in Dataflow streaming runner is currently
> > waiting only on having a small service-side change implemented and rolled
> > out for termination of streaming jobs.
> >
> > On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <[email protected]> wrote:
> >
> > I recommend proceeding with the runner-facing state & timer APIs; they are
> > lower-level and more appropriate for this. All runners provide them or use
> > runners/core implementations, as they are needed for triggering.
> >
> > On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <[email protected]>
> > wrote:
> >
> > Thanks Aljoscha!
> >
> > Minor note: I'm not familiar with what level of support for timers Flink
> > currently has - however SDF in Direct and Dataflow runner currently does
> > not use the user-facing state/timer APIs - rather, it uses the
> > runner-facing APIs (StateInternals and TimerInternals) - perhaps Flink
> > already implements these. We may want to change this, but for now it's good
> > enough (besides, SDF uses watermark holds, which are not supported by the
> > user-facing state API yet).
> >
> > On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> > [email protected]> wrote:
> >
> > Thanks for the motivation, Eugene! :-)
> >
> > I've wanted to do this for a while now but was waiting for the Flink 1.2
> > release (which happened this week)! There's some prerequisite work to be
> > done on the Flink runner: we'll move to the new timer interfaces introduced
> > in Flink 1.2 and implement support for both the user facing state and timer
> > APIs. This should make implementation of SDF easier.
> >
> > On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <[email protected]>
> > wrote:
> >
> > Thanks! Looking forward to this work.
> >
> > On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <[email protected]>
> > wrote:
> >
> > Thanks for the update Eugene.
> >
> > I will work on the spark runner with Amit.
> >
> > Regards
> > JB
> >
> > On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> > <[email protected]> wrote:
> > >Hello,
> > >
> > >I'm almost done adding support for Splittable DoFn
> > >http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and
> > >very excited about that. There's only 1 PR
> > ><https://github.com/apache/beam/pull/1898> remaining, plus enabling
> > >some
> > >tests.
> > >
> > >* (batch runner is much harder because it's not yet quite clear to me
> > >how
> > >to properly implement liquid sharding
> > ><
> > https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
> > >
> > >with
> > >SDF - and the current API is not ready for that yet)
> > >
> > >After implementing all the runner-agnostic parts of Splittable DoFn, I
> > >found them quite easy to integrate into Dataflow streaming runner, and
> > >I
> > >think this means it should be easy to integrate into other runners too.
> > >
> > >====== Why it'd be cool ======
> > >The general benefits of SDF are well-described in the design doc
> > >(linked
> > >above).
> > >As for right now - if we integrated SDF with all runners, it'd already
> > >enable us to start greatly simplifying the code of existing streaming
> > >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new
> > >connectors
> > >(e.g. a really nice one to implement would be "directory watcher", that
> > >continuously returns new files in a directory).
> > >
> > >As a teaser, here's the complete implementation of an "unbounded
> > >counter" I
> > >used for my test of Dataflow runner integration:
> > >
> > >  class CountFn extends DoFn<String, String> {
> > >    @ProcessElement
> > >public ProcessContinuation process(ProcessContext c, OffsetRangeTracker
> > >tracker) {
> > >      for (int i = tracker.currentRestriction().getFrom();
> > >tracker.tryClaim(i); ++i) c.output(i);
> > >      return resume();
> > >    }
> > >
> > >    @GetInitialRestriction
> > >    public OffsetRange getInitialRange(String element) { return new
> > >OffsetRange(0, Integer.MAX_VALUE); }
> > >
> > >    @NewTracker
> > >   public OffsetRangeTracker newTracker(OffsetRange range) { return new
> > >OffsetRangeTracker(range); }
> > >  }
> > >
> > >====== What I'm asking ======
> > >So, I'd like to ask for help integrating SDF into Spark, Flink and Apex
> > >runners from people who are intimately familiar with them -
> > >specifically, I
> > >was hoping best-case I could nerd-snipe some of you into taking over
> > >the
> > >integration of SDF with your favorite runner ;)
> > >
> > >The proper set of people seems to be +Aljoscha Krettek
> > ><[email protected]> +Maximilian Michels
> > ><[email protected]>
> > >[email protected] <[email protected]> +Amit Sela
> > ><[email protected]> +Thomas
> > >Weise unless I forgot somebody.
> > >
> > >Average-case, I was looking for runner-specific guidance on how to do
> > >it
> > >myself.
> > >
> > >====== If you want to help ======
> > >If somebody decides to take this over, in my absence (I'll be mostly
> > >gone
> > >for ~the next month)., the best people to ask for implementation
> > >advice are +Kenn
> > >Knowles <[email protected]> and +Daniel Mills <[email protected]> .
> > >
> > >For reference, here's how SDF is implemented in the direct runner:
> > >- Direct runner overrides
> > ><
> > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b74a62d9b24/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
> > >
> > > ParDo.of() for a splittable DoFn and replaces it with SplittableParDo
> > ><
> > https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> > >
> > >(common
> > >transform expansion)
> > >- SplittableParDo uses two runner-specific primitive transforms:
> > >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner
> > >overrides the first one like this
> > ><
> > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> > >,
> > >and directly implements evaluation of the second one like this
> > ><
> > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> > >,
> > >using runner hooks introduced in this PR
> > ><https://github.com/apache/beam/pull/1824>. At the core of the hooks is
> > >"ProcessFn" which is like a regular DoFn but has to be prepared at
> > >runtime
> > >with some hooks (state, timers, and runner access to
> > >RestrictionTracker)
> > >before you invoke it. I added a convenience implementation of the hook
> > >mimicking behavior of UnboundedSource.
> > >- The relevant runner-agnostic tests are in SplittableDoFnTest
> > ><
> > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
> > >
> > >.
> > >
> > >That's all it takes, really - the runner has to implement these two
> > >transforms. When I looked at Spark and Flink runners, it was not quite
> > >clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g.
> > >Spark runner currently doesn't use KeyedWorkItem at all - but it seems
> > >definitely possible.
> > >
> > >Thanks!
> >
> >
> >
> >
> > --
> > Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin
> >
> > [email protected]
> > +49-(0)30-55599146
> >
> > Registered at Amtsgericht Charlottenburg - HRB 158244 B
> > Managing Directors: Kostas Tzoumas, Stephan Ewen
> >
> >
> >

Reply via email to