Hi! Please feel free to join this call, but I think we'd be mostly discussing how to do it in the Spark runner in particular; so we'll probably need another call for Apex anyway.
On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <t...@apache.org> wrote: > Hi Eugene, > > This would work for me also. Please let me know if you want to keep the > Apex related discussion separate or want me to join this call. > > Thanks, > Thomas > > > On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov < > kirpic...@google.com.invalid> wrote: > > > Sure, Friday morning sounds good. How about 9am Friday PST, at videocall > by > > link https://hangouts.google.com/hangouts/_/google.com/splittabledofn ? > > > > On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <amitsel...@gmail.com> wrote: > > > > > PST mornings are better, because they are evening/nights for me. Friday > > > would work-out best for me. > > > > > > On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov > > > <kirpic...@google.com.invalid> wrote: > > > > > > > Awesome!!! > > > > > > > > Amit - remind me your time zone? JB, do you want to join? > > > > I'm free this week all afternoons (say after 2pm) in Pacific Time, > and > > > > mornings of Wed & Fri. We'll probably need half an hour to an hour. > > > > > > > > On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek < > aljos...@apache.org> > > > > wrote: > > > > > > > > > I whipped up a quick version for Flink that seems to work: > > > > > https://github.com/apache/beam/pull/2235 > > > > > > > > > > There are still two failing tests, as described in the PR. > > > > > > > > > > On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote: > > > > > > +1 for a video call. I think it should be pretty straight forward > > for > > > > the > > > > > > Spark runner after the work on read from UnboundedSource and > after > > > > > > GroupAlsoByWindow, but from my experience such a call could move > us > > > > > > forward > > > > > > fast enough. > > > > > > > > > > > > On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov < > kirpic...@google.com > > > > > > > > > wrote: > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > Let us continue working on this. I am back from various travels > > and > > > > am > > > > > > > eager to help. > > > > > > > > > > > > > > Amit, JB - would you like to perhaps have a videocall to hash > > this > > > > out > > > > > for > > > > > > > the Spark runner? > > > > > > > > > > > > > > Aljoscha - are the necessary Flink changes done / or is the > need > > > for > > > > > them > > > > > > > obviated by using the (existing) runner-facing state/timer > APIs? > > > > > Should we > > > > > > > have a videocall too? > > > > > > > > > > > > > > Thomas - what do you think about getting this into Apex runner? > > > > > > > > > > > > > > (I think videocalls will allow to make rapid progress, but it's > > > > > probably a > > > > > > > better idea to keep them separate since they'll involve a lot > of > > > > > > > runner-specific details) > > > > > > > > > > > > > > PS - The completion of this in Dataflow streaming runner is > > > currently > > > > > > > waiting only on having a small service-side change implemented > > and > > > > > rolled > > > > > > > out for termination of streaming jobs. > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles < > k...@google.com> > > > > > wrote: > > > > > > > > > > > > > > I recommend proceeding with the runner-facing state & timer > APIs; > > > > they > > > > > are > > > > > > > lower-level and more appropriate for this. All runners provide > > them > > > > or > > > > > use > > > > > > > runners/core implementations, as they are needed for > triggering. > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov < > > > > > kirpic...@google.com> > > > > > > > wrote: > > > > > > > > > > > > > > Thanks Aljoscha! > > > > > > > > > > > > > > Minor note: I'm not familiar with what level of support for > > timers > > > > > Flink > > > > > > > currently has - however SDF in Direct and Dataflow runner > > currently > > > > > does > > > > > > > not use the user-facing state/timer APIs - rather, it uses the > > > > > > > runner-facing APIs (StateInternals and TimerInternals) - > perhaps > > > > Flink > > > > > > > already implements these. We may want to change this, but for > now > > > > it's > > > > > good > > > > > > > enough (besides, SDF uses watermark holds, which are not > > supported > > > by > > > > > the > > > > > > > user-facing state API yet). > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek < > > > > > > > aljos...@data-artisans.com> wrote: > > > > > > > > > > > > > > Thanks for the motivation, Eugene! :-) > > > > > > > > > > > > > > I've wanted to do this for a while now but was waiting for the > > > Flink > > > > > 1.2 > > > > > > > release (which happened this week)! There's some prerequisite > > work > > > to > > > > > be > > > > > > > done on the Flink runner: we'll move to the new timer > interfaces > > > > > introduced > > > > > > > in Flink 1.2 and implement support for both the user facing > state > > > and > > > > > timer > > > > > > > APIs. This should make implementation of SDF easier. > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov < > > > > kirpic...@google.com > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > Thanks! Looking forward to this work. > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré < > > > j...@nanthrax.net > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > Thanks for the update Eugene. > > > > > > > > > > > > > > I will work on the spark runner with Amit. > > > > > > > > > > > > > > Regards > > > > > > > JB > > > > > > > > > > > > > > On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov > > > > > > > <kirpic...@google.com.INVALID> wrote: > > > > > > > >Hello, > > > > > > > > > > > > > > > >I'm almost done adding support for Splittable DoFn > > > > > > > >http://s.apache.org/splittable-do-fn to Dataflow streaming > > > runner*, > > > > > and > > > > > > > >very excited about that. There's only 1 PR > > > > > > > ><https://github.com/apache/beam/pull/1898> remaining, plus > > > enabling > > > > > > > >some > > > > > > > >tests. > > > > > > > > > > > > > > > >* (batch runner is much harder because it's not yet quite > clear > > to > > > > me > > > > > > > >how > > > > > > > >to properly implement liquid sharding > > > > > > > >< > > > > > > > > > > > > > > > > > > > https://cloud.google.com/blog/big-data/2016/05/no-shard- > > left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow > > > > > > > > > > > > > > > >with > > > > > > > >SDF - and the current API is not ready for that yet) > > > > > > > > > > > > > > > >After implementing all the runner-agnostic parts of Splittable > > > > DoFn, I > > > > > > > >found them quite easy to integrate into Dataflow streaming > > runner, > > > > and > > > > > > > >I > > > > > > > >think this means it should be easy to integrate into other > > runners > > > > > too. > > > > > > > > > > > > > > > >====== Why it'd be cool ====== > > > > > > > >The general benefits of SDF are well-described in the design > doc > > > > > > > >(linked > > > > > > > >above). > > > > > > > >As for right now - if we integrated SDF with all runners, it'd > > > > already > > > > > > > >enable us to start greatly simplifying the code of existing > > > > streaming > > > > > > > >connectors (CountingInput, Kafka, Pubsub, JMS) and writing new > > > > > > > >connectors > > > > > > > >(e.g. a really nice one to implement would be "directory > > watcher", > > > > > that > > > > > > > >continuously returns new files in a directory). > > > > > > > > > > > > > > > >As a teaser, here's the complete implementation of an > "unbounded > > > > > > > >counter" I > > > > > > > >used for my test of Dataflow runner integration: > > > > > > > > > > > > > > > > class CountFn extends DoFn<String, String> { > > > > > > > > @ProcessElement > > > > > > > >public ProcessContinuation process(ProcessContext c, > > > > > OffsetRangeTracker > > > > > > > >tracker) { > > > > > > > > for (int i = tracker.currentRestriction().getFrom(); > > > > > > > >tracker.tryClaim(i); ++i) c.output(i); > > > > > > > > return resume(); > > > > > > > > } > > > > > > > > > > > > > > > > @GetInitialRestriction > > > > > > > > public OffsetRange getInitialRange(String element) { > return > > > new > > > > > > > >OffsetRange(0, Integer.MAX_VALUE); } > > > > > > > > > > > > > > > > @NewTracker > > > > > > > > public OffsetRangeTracker newTracker(OffsetRange range) { > > > return > > > > > new > > > > > > > >OffsetRangeTracker(range); } > > > > > > > > } > > > > > > > > > > > > > > > >====== What I'm asking ====== > > > > > > > >So, I'd like to ask for help integrating SDF into Spark, Flink > > and > > > > > Apex > > > > > > > >runners from people who are intimately familiar with them - > > > > > > > >specifically, I > > > > > > > >was hoping best-case I could nerd-snipe some of you into > taking > > > over > > > > > > > >the > > > > > > > >integration of SDF with your favorite runner ;) > > > > > > > > > > > > > > > >The proper set of people seems to be +Aljoscha Krettek > > > > > > > ><aljos...@data-artisans.com> +Maximilian Michels > > > > > > > ><m...@data-artisans.com> > > > > > > > >+ieme...@gmail.com <ieme...@gmail.com> +Amit Sela > > > > > > > ><amitsel...@gmail.com> +Thomas > > > > > > > >Weise unless I forgot somebody. > > > > > > > > > > > > > > > >Average-case, I was looking for runner-specific guidance on > how > > to > > > > do > > > > > > > >it > > > > > > > >myself. > > > > > > > > > > > > > > > >====== If you want to help ====== > > > > > > > >If somebody decides to take this over, in my absence (I'll be > > > mostly > > > > > > > >gone > > > > > > > >for ~the next month)., the best people to ask for > implementation > > > > > > > >advice are +Kenn > > > > > > > >Knowles <k...@google.com> and +Daniel Mills <mil...@google.com > > > > . > > > > > > > > > > > > > > > >For reference, here's how SDF is implemented in the direct > > runner: > > > > > > > >- Direct runner overrides > > > > > > > >< > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b > > 74a62d9b24/runners/direct-java/src/main/java/org/apache/ > > beam/runners/direct/ParDoMultiOverrideFactory.java > > > > > > > > > > > > > > > > ParDo.of() for a splittable DoFn and replaces it with > > > > SplittableParDo > > > > > > > >< > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/master/runners/core- > > java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java > > > > > > > > > > > > > > > >(common > > > > > > > >transform expansion) > > > > > > > >- SplittableParDo uses two runner-specific primitive > transforms: > > > > > > > >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". > Direct > > > > runner > > > > > > > >overrides the first one like this > > > > > > > >< > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java > > > > > > > >, > > > > > > > >and directly implements evaluation of the second one like this > > > > > > > >< > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java > > > > > > > >, > > > > > > > >using runner hooks introduced in this PR > > > > > > > ><https://github.com/apache/beam/pull/1824>. At the core of > the > > > > hooks > > > > > is > > > > > > > >"ProcessFn" which is like a regular DoFn but has to be > prepared > > at > > > > > > > >runtime > > > > > > > >with some hooks (state, timers, and runner access to > > > > > > > >RestrictionTracker) > > > > > > > >before you invoke it. I added a convenience implementation of > > the > > > > hook > > > > > > > >mimicking behavior of UnboundedSource. > > > > > > > >- The relevant runner-agnostic tests are in SplittableDoFnTest > > > > > > > >< > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/ > > transforms/SplittableDoFnTest.java > > > > > > > > > > > > > > > >. > > > > > > > > > > > > > > > >That's all it takes, really - the runner has to implement > these > > > two > > > > > > > >transforms. When I looked at Spark and Flink runners, it was > not > > > > quite > > > > > > > >clear to me how to implement the GBKIntoKeyedWorkItems > > transform, > > > > e.g. > > > > > > > >Spark runner currently doesn't use KeyedWorkItem at all - but > it > > > > seems > > > > > > > >definitely possible. > > > > > > > > > > > > > > > >Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin > > > > > > > > > > > > > > i...@data-artisans.com > > > > > > > +49-(0)30-55599146 <+49%2030%2055599146> > <+49%2030%2055599146> <+49%2030%2055599146> > > > <+49%2030%2055599146> > > > > > > > > > > > > > > Registered at Amtsgericht Charlottenburg - HRB 158244 B > > > > > > > Managing Directors: Kostas Tzoumas, Stephan Ewen > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >