Hello, I'm almost done adding support for Splittable DoFn http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and very excited about that. There's only 1 PR <https://github.com/apache/beam/pull/1898> remaining, plus enabling some tests.
* (batch runner is much harder because it's not yet quite clear to me how to properly implement liquid sharding <https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow> with SDF - and the current API is not ready for that yet) After implementing all the runner-agnostic parts of Splittable DoFn, I found them quite easy to integrate into Dataflow streaming runner, and I think this means it should be easy to integrate into other runners too. ====== Why it'd be cool ====== The general benefits of SDF are well-described in the design doc (linked above). As for right now - if we integrated SDF with all runners, it'd already enable us to start greatly simplifying the code of existing streaming connectors (CountingInput, Kafka, Pubsub, JMS) and writing new connectors (e.g. a really nice one to implement would be "directory watcher", that continuously returns new files in a directory). As a teaser, here's the complete implementation of an "unbounded counter" I used for my test of Dataflow runner integration: class CountFn extends DoFn<String, String> { @ProcessElement public ProcessContinuation process(ProcessContext c, OffsetRangeTracker tracker) { for (int i = tracker.currentRestriction().getFrom(); tracker.tryClaim(i); ++i) c.output(i); return resume(); } @GetInitialRestriction public OffsetRange getInitialRange(String element) { return new OffsetRange(0, Integer.MAX_VALUE); } @NewTracker public OffsetRangeTracker newTracker(OffsetRange range) { return new OffsetRangeTracker(range); } } ====== What I'm asking ====== So, I'd like to ask for help integrating SDF into Spark, Flink and Apex runners from people who are intimately familiar with them - specifically, I was hoping best-case I could nerd-snipe some of you into taking over the integration of SDF with your favorite runner ;) The proper set of people seems to be +Aljoscha Krettek <[email protected]> +Maximilian Michels <[email protected]> [email protected] <[email protected]> +Amit Sela <[email protected]> +Thomas Weise unless I forgot somebody. Average-case, I was looking for runner-specific guidance on how to do it myself. ====== If you want to help ====== If somebody decides to take this over, in my absence (I'll be mostly gone for ~the next month)., the best people to ask for implementation advice are +Kenn Knowles <[email protected]> and +Daniel Mills <[email protected]> . For reference, here's how SDF is implemented in the direct runner: - Direct runner overrides <https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b74a62d9b24/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java> ParDo.of() for a splittable DoFn and replaces it with SplittableParDo <https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java> (common transform expansion) - SplittableParDo uses two runner-specific primitive transforms: "GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner overrides the first one like this <https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java>, and directly implements evaluation of the second one like this <https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java>, using runner hooks introduced in this PR <https://github.com/apache/beam/pull/1824>. At the core of the hooks is "ProcessFn" which is like a regular DoFn but has to be prepared at runtime with some hooks (state, timers, and runner access to RestrictionTracker) before you invoke it. I added a convenience implementation of the hook mimicking behavior of UnboundedSource. - The relevant runner-agnostic tests are in SplittableDoFnTest <https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> . That's all it takes, really - the runner has to implement these two transforms. When I looked at Spark and Flink runners, it was not quite clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g. Spark runner currently doesn't use KeyedWorkItem at all - but it seems definitely possible. Thanks!
