Thanks for the update Eugene.

I will work on the spark runner with Amit.

Regards
JB

On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov 
<[email protected]> wrote:
>Hello,
>
>I'm almost done adding support for Splittable DoFn
>http://s.apache.org/splittable-do-fn to Dataflow streaming runner*, and
>very excited about that. There's only 1 PR
><https://github.com/apache/beam/pull/1898> remaining, plus enabling
>some
>tests.
>
>* (batch runner is much harder because it's not yet quite clear to me
>how
>to properly implement liquid sharding
><https://cloud.google.com/blog/big-data/2016/05/no-shard-left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow>
>with
>SDF - and the current API is not ready for that yet)
>
>After implementing all the runner-agnostic parts of Splittable DoFn, I
>found them quite easy to integrate into Dataflow streaming runner, and
>I
>think this means it should be easy to integrate into other runners too.
>
>====== Why it'd be cool ======
>The general benefits of SDF are well-described in the design doc
>(linked
>above).
>As for right now - if we integrated SDF with all runners, it'd already
>enable us to start greatly simplifying the code of existing streaming
>connectors (CountingInput, Kafka, Pubsub, JMS) and writing new
>connectors
>(e.g. a really nice one to implement would be "directory watcher", that
>continuously returns new files in a directory).
>
>As a teaser, here's the complete implementation of an "unbounded
>counter" I
>used for my test of Dataflow runner integration:
>
>  class CountFn extends DoFn<String, String> {
>    @ProcessElement
>public ProcessContinuation process(ProcessContext c, OffsetRangeTracker
>tracker) {
>      for (int i = tracker.currentRestriction().getFrom();
>tracker.tryClaim(i); ++i) c.output(i);
>      return resume();
>    }
>
>    @GetInitialRestriction
>    public OffsetRange getInitialRange(String element) { return new
>OffsetRange(0, Integer.MAX_VALUE); }
>
>    @NewTracker
>   public OffsetRangeTracker newTracker(OffsetRange range) { return new
>OffsetRangeTracker(range); }
>  }
>
>====== What I'm asking ======
>So, I'd like to ask for help integrating SDF into Spark, Flink and Apex
>runners from people who are intimately familiar with them -
>specifically, I
>was hoping best-case I could nerd-snipe some of you into taking over
>the
>integration of SDF with your favorite runner ;)
>
>The proper set of people seems to be +Aljoscha Krettek
><[email protected]> +Maximilian Michels
><[email protected]>
>[email protected] <[email protected]> +Amit Sela
><[email protected]> +Thomas
>Weise unless I forgot somebody.
>
>Average-case, I was looking for runner-specific guidance on how to do
>it
>myself.
>
>====== If you want to help ======
>If somebody decides to take this over, in my absence (I'll be mostly
>gone
>for ~the next month)., the best people to ask for implementation
>advice are +Kenn
>Knowles <[email protected]> and +Daniel Mills <[email protected]> .
>
>For reference, here's how SDF is implemented in the direct runner:
>- Direct runner overrides
><https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b74a62d9b24/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java>
> ParDo.of() for a splittable DoFn and replaces it with SplittableParDo
><https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java>
>(common
>transform expansion)
>- SplittableParDo uses two runner-specific primitive transforms:
>"GBKIntoKeyedWorkItems" and "SplittableProcessElements". Direct runner
>overrides the first one like this
><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java>,
>and directly implements evaluation of the second one like this
><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java>,
>using runner hooks introduced in this PR
><https://github.com/apache/beam/pull/1824>. At the core of the hooks is
>"ProcessFn" which is like a regular DoFn but has to be prepared at
>runtime
>with some hooks (state, timers, and runner access to
>RestrictionTracker)
>before you invoke it. I added a convenience implementation of the hook
>mimicking behavior of UnboundedSource.
>- The relevant runner-agnostic tests are in SplittableDoFnTest
><https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a32599024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
>.
>
>That's all it takes, really - the runner has to implement these two
>transforms. When I looked at Spark and Flink runners, it was not quite
>clear to me how to implement the GBKIntoKeyedWorkItems transform, e.g.
>Spark runner currently doesn't use KeyedWorkItem at all - but it seems
>definitely possible.
>
>Thanks!

Reply via email to