I recommend proceeding with the runner-facing state & timer APIs; they are lower-level and more appropriate for this. All runners provide them or use runners/core implementations, as they are needed for triggering.
On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <[email protected]> wrote: > Thanks Aljoscha! > > Minor note: I'm not familiar with what level of support for timers Flink > currently has - however SDF in Direct and Dataflow runner currently does > not use the user-facing state/timer APIs - rather, it uses the > runner-facing APIs (StateInternals and TimerInternals) - perhaps Flink > already implements these. We may want to change this, but for now it's good > enough (besides, SDF uses watermark holds, which are not supported by the > user-facing state API yet). > > On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek < > [email protected]> wrote: > >> Thanks for the motivation, Eugene! :-) >> >> I've wanted to do this for a while now but was waiting for the Flink 1.2 >> release (which happened this week)! There's some prerequisite work to be >> done on the Flink runner: we'll move to the new timer interfaces introduced >> in Flink 1.2 and implement support for both the user facing state and timer >> APIs. This should make implementation of SDF easier. >> >> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <[email protected]> >> wrote: >> >> Thanks! Looking forward to this work. >> >> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <[email protected]> >> wrote: >> >> Thanks for the update Eugene. >> >> I will work on the spark runner with Amit. >> >> Regards >> JB >> >> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov >> <[email protected]> wrote: >> >Hello, >> > >> >I'm almost done adding support for Splittable DoFn >> >http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and >> >very excited about that. There's only 1 PR >> ><https://github.com/apache/beam/pull/1898> remaining, plus enabling >> >some >> >tests. >> > >> >* (batch runner is much harder because it's not yet quite clear to me >> >how >> >to properly implement liquid sharding >> ><https://cloud.google.com/blog/big-data/2016/05/no- >> shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow> >> >with >> >SDF - and the current API is not ready for that yet) >> > >> >After implementing all the runner-agnostic parts of Splittable DoFn, I >> >found them quite easy to integrate into Dataflow streaming runner, and >> >I >> >think this means it should be easy to integrate into other runners too. >> > >> >====== Why it'd be cool ====== >> >The general benefits of SDF are well-described in the design doc >> >(linked >> >above). >> >As for right now - if we integrated SDF with all runners, it'd already >> >enable us to start greatly simplifying the code of existing streaming >> >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new >> >connectors >> >(e.g. a really nice one to implement would be "directory watcher", that >> >continuously returns new files in a directory). >> > >> >As a teaser, here's the complete implementation of an "unbounded >> >counter" I >> >used for my test of Dataflow runner integration: >> > >> > class CountFn extends DoFn<String, String> { >> > @ProcessElement >> >public ProcessContinuation process(ProcessContext c, OffsetRangeTracker >> >tracker) { >> > for (int i = tracker.currentRestriction().getFrom(); >> >tracker.tryClaim(i); ++i) c.output(i); >> > return resume(); >> > } >> > >> > @GetInitialRestriction >> > public OffsetRange getInitialRange(String element) { return new >> >OffsetRange(0, Integer.MAX_VALUE); } >> > >> > @NewTracker >> > public OffsetRangeTracker newTracker(OffsetRange range) { return new >> >OffsetRangeTracker(range); } >> > } >> > >> >====== What I'm asking ====== >> >So, I'd like to ask for help integrating SDF into Spark, Flink and Apex >> >runners from people who are intimately familiar with them - >> >specifically, I >> >was hoping best-case I could nerd-snipe some of you into taking over >> >the >> >integration of SDF with your favorite runner ;) >> > >> >The proper set of people seems to be +Aljoscha Krettek >> ><[email protected]> +Maximilian Michels >> ><[email protected]> >> >[email protected] <[email protected]> +Amit Sela >> ><[email protected]> +Thomas >> >Weise unless I forgot somebody. >> > >> >Average-case, I was looking for runner-specific guidance on how to do >> >it >> >myself. >> > >> >====== If you want to help ====== >> >If somebody decides to take this over, in my absence (I'll be mostly >> >gone >> >for ~the next month)., the best people to ask for implementation >> >advice are +Kenn >> >Knowles <[email protected]> and +Daniel Mills <[email protected]> . >> > >> >For reference, here's how SDF is implemented in the direct runner: >> >- Direct runner overrides >> ><https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b >> 74a62d9b24/runners/direct-java/src/main/java/org/apache/ >> beam/runners/direct/ParDoMultiOverrideFactory.java> >> > ParDo.of() for a splittable DoFn and replaces it with SplittableParDo >> ><https://github.com/apache/beam/blob/master/runners/core- >> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java> >> >(common >> >transform expansion) >> >- SplittableParDo uses two runner-specific primitive transforms: >> >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner >> >overrides the first one like this >> ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 >> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ >> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java>, >> >and directly implements evaluation of the second one like this >> ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 >> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ >> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java>, >> >using runner hooks introduced in this PR >> ><https://github.com/apache/beam/pull/1824>. At the core of the hooks is >> >"ProcessFn" which is like a regular DoFn but has to be prepared at >> >runtime >> >with some hooks (state, timers, and runner access to >> >RestrictionTracker) >> >before you invoke it. I added a convenience implementation of the hook >> >mimicking behavior of UnboundedSource. >> >- The relevant runner-agnostic tests are in SplittableDoFnTest >> ><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 >> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/ >> transforms/SplittableDoFnTest.java> >> >. >> > >> >That's all it takes, really - the runner has to implement these two >> >transforms. When I looked at Spark and Flink runners, it was not quite >> >clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g. >> >Spark runner currently doesn't use KeyedWorkItem at all - but it seems >> >definitely possible. >> > >> >Thanks! >> >> >> >> >> -- >> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin >> >> [email protected] >> +49-(0)30-55599146 >> >> Registered at Amtsgericht Charlottenburg - HRB 158244 B >> Managing Directors: Kostas Tzoumas, Stephan Ewen >> >
