Thanks for the motivation, Eugene! :-) I've wanted to do this for a while now but was waiting for the Flink 1.2 release (which happened this week)! There's some prerequisite work to be done on the Flink runner: we'll move to the new timer interfaces introduced in Flink 1.2 and implement support for both the user facing state and timer APIs. This should make implementation of SDF easier.
On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <[email protected]> wrote: > Thanks! Looking forward to this work. > > On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <[email protected]> > wrote: > >> Thanks for the update Eugene. >> >> I will work on the spark runner with Amit. >> >> Regards >> JB >> >> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov >> <[email protected]> wrote: >> >Hello, >> > >> >I'm almost done adding support for Splittable DoFn >> >http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and >> >very excited about that. There's only 1 PR >> ><https://github.com/apache/beam/pull/1898> remaining, plus enabling >> >some >> >tests. >> > >> >* (batch runner is much harder because it's not yet quite clear to me >> >how >> >to properly implement liquid sharding >> ><https://cloud.google.com/blog/big-data/2016/05/no- >> shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow> >> >with >> >SDF - and the current API is not ready for that yet) >> > >> >After implementing all the runner-agnostic parts of Splittable DoFn, I >> >found them quite easy to integrate into Dataflow streaming runner, and >> >I >> >think this means it should be easy to integrate into other runners too. >> > >> >====== Why it'd be cool ====== >> >The general benefits of SDF are well-described in the design doc >> >(linked >> >above). >> >As for right now - if we integrated SDF with all runners, it'd already >> >enable us to start greatly simplifying the code of existing streaming >> >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new >> >connectors >> >(e.g. a really nice one to implement would be "directory watcher", that >> >continuously returns new files in a directory). >> > >> >As a teaser, here's the complete implementation of an "unbounded >> >counter" I >> >used for my test of Dataflow runner integration: >> > >> > class CountFn extends DoFn<String, String> { >> > @ProcessElement >> >public ProcessContinuation process(ProcessContext c, OffsetRangeTracker >> >tracker) { >> > for (int i = tracker.currentRestriction().getFrom(); >> >tracker.tryClaim(i); ++i) c.output(i); >> > return resume(); >> > } >> > >> > @GetInitialRestriction >> > public OffsetRange getInitialRange(String element) { return new >> >OffsetRange(0, Integer.MAX_VALUE); } >> > >> > @NewTracker >> > public OffsetRangeTracker newTracker(OffsetRange range) { return new >> >OffsetRangeTracker(range); } >> > } >> > >> >====== What I'm asking ====== >> >So, I'd like to ask for help integrating SDF into Spark, Flink and Apex >> >runners from people who are intimately familiar with them - >> >specifically, I >> >was hoping best-case I could nerd-snipe some of you into taking over >> >the >> >integration of SDF with your favorite runner ;) >> > >> >The proper set of people seems to be +Aljoscha Krettek >> ><[email protected]> +Maximilian Michels >> ><[email protected]> >> >[email protected] <[email protected]> +Amit Sela >> ><[email protected]> +Thomas >> >Weise unless I forgot somebody. >> > >> >Average-case, I was looking for runner-specific guidance on how to do >> >it >> >myself. >> > >> >====== If you want to help ====== >> >If somebody decides to take this over, in my absence (I'll be mostly >> >gone >> >for ~the next month)., the best people to ask for implementation >> >advice are +Kenn >> >Knowles <[email protected]> and +Daniel Mills <[email protected]> . >> > >> >For reference, here's how SDF is implemented in the direct runner: >> >- Direct runner overrides >> ><https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b >> 74a62d9b24/runners/direct-java/src/main/java/org/apache/ >> beam/runners/direct/ParDoMultiOverrideFactory.java> >> > ParDo.of() for a splittable DoFn and replaces it with SplittableParDo >> ><https://github.com/apache/beam/blob/master/runners/core- >> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java> >> >(common >> >transform expansion) >> >- SplittableParDo uses two runner-specific primitive transforms: >> >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner >> >overrides the first one like this >> ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 >> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ >> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java>, >> >and directly implements evaluation of the second one like this >> ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 >> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ >> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java>, >> >using runner hooks introduced in this PR >> ><https://github.com/apache/beam/pull/1824>. At the core of the hooks is >> >"ProcessFn" which is like a regular DoFn but has to be prepared at >> >runtime >> >with some hooks (state, timers, and runner access to >> >RestrictionTracker) >> >before you invoke it. I added a convenience implementation of the hook >> >mimicking behavior of UnboundedSource. >> >- The relevant runner-agnostic tests are in SplittableDoFnTest >> ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 >> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/ >> transforms/SplittableDoFnTest.java> >> >. >> > >> >That's all it takes, really - the runner has to implement these two >> >transforms. When I looked at Spark and Flink runners, it was not quite >> >clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g. >> >Spark runner currently doesn't use KeyedWorkItem at all - but it seems >> >definitely possible. >> > >> >Thanks! >> > -- Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin [email protected] +49-(0)30-55599146 Registered at Amtsgericht Charlottenburg - HRB 158244 B Managing Directors: Kostas Tzoumas, Stephan Ewen
