Hello,

I'm almost done adding support for Splittable DoFn
http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and
very excited about that. There's only 1 PR
<https://github.com/apache/beam/pull/1898> remaining, plus enabling some
tests.

* (batch runner is much harder because it's not yet quite clear to me how
to properly implement liquid sharding
<https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow>
with
SDF - and the current API is not ready for that yet)

After implementing all the runner-agnostic parts of Splittable DoFn, I
found them quite easy to integrate into Dataflow streaming runner, and I
think this means it should be easy to integrate into other runners too.

====== Why it'd be cool ======
The general benefits of SDF are well-described in the design doc (linked
above).
As for right now - if we integrated SDF with all runners, it'd already
enable us to start greatly simplifying the code of existing streaming
connectors (CountingInput, Kafka, Pubsub, JMS) and writing new connectors
(e.g. a really nice one to implement would be "directory watcher", that
continuously returns new files in a directory).

As a teaser, here's the complete implementation of an "unbounded counter" I
used for my test of Dataflow runner integration:

  class CountFn extends DoFn<String, String> {
    @ProcessElement
    public ProcessContinuation process(ProcessContext c, OffsetRangeTracker
tracker) {
      for (int i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) c.output(i);
      return resume();
    }

    @GetInitialRestriction
    public OffsetRange getInitialRange(String element) { return new
OffsetRange(0, Integer.MAX_VALUE); }

    @NewTracker
    public OffsetRangeTracker newTracker(OffsetRange range) { return new
OffsetRangeTracker(range); }
  }

====== What I'm asking ======
So, I'd like to ask for help integrating SDF into Spark, Flink and Apex
runners from people who are intimately familiar with them - specifically, I
was hoping best-case I could nerd-snipe some of you into taking over the
integration of SDF with your favorite runner ;)

The proper set of people seems to be +Aljoscha Krettek
<[email protected]> +Maximilian Michels <[email protected]>
[email protected] <[email protected]> +Amit Sela
<[email protected]> +Thomas
Weise unless I forgot somebody.

Average-case, I was looking for runner-specific guidance on how to do it
myself.

====== If you want to help ======
If somebody decides to take this over, in my absence (I'll be mostly gone
for ~the next month)., the best people to ask for implementation
advice are +Kenn
Knowles <[email protected]> and +Daniel Mills <[email protected]> .

For reference, here's how SDF is implemented in the direct runner:
- Direct runner overrides
<https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b74a62d9b24/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java>
 ParDo.of() for a splittable DoFn and replaces it with SplittableParDo
<https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java>
(common
transform expansion)
- SplittableParDo uses two runner-specific primitive transforms:
"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner
overrides the first one like this
<https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java>,
and directly implements evaluation of the second one like this
<https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java>,
using runner hooks introduced in this PR
<https://github.com/apache/beam/pull/1824>. At the core of the hooks is
"ProcessFn" which is like a regular DoFn but has to be prepared at runtime
with some hooks (state, timers, and runner access to RestrictionTracker)
before you invoke it. I added a convenience implementation of the hook
mimicking behavior of UnboundedSource.
- The relevant runner-agnostic tests are in SplittableDoFnTest
<https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
.

That's all it takes, really - the runner has to implement these two
transforms. When I looked at Spark and Flink runners, it was not quite
clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g.
Spark runner currently doesn't use KeyedWorkItem at all - but it seems
definitely possible.

Thanks!

Reply via email to