Meeting notes from today's call with Amit, Aviem and Ismaël: Spark has 2 types of stateful operators; a cheap one intended for updating elements (works with state but not with timers) and an expensive one. I.e. there's no efficient direct counterpart to Beam's keyed state model. In implementation of Beam State & Timers API, Spark runner will use the cheaper one for state and the expensive one for timers. So, for SDF, which in the runner-agnostic SplittableParDo expansion needs both state and timers, we'll need the expensive one - but this should be fine since with SDF the bottleneck should be in the ProcessElement call itself, not in splitting/scheduling it.
For Spark batch runner, implementing SDF might be still simpler: runner will just not request any checkpointing. Hard parts about SDF/batch are dynamic rebalancing and size estimation APIs - they will be refined this quarter, but it's ok to initially not have them. Spark runner might use a different expansion of SDF not involving KeyedWorkItem's (i.e. not overriding the GBKIntoKeyedWorkItems transform), though still striving to reuse as much code as possible from the standard expansion implemented in SplittableParDo, at least ProcessFn. Testing questions: - Spark runner already implements termination on watermarks-reaching-infinity properly. - Q: How to test that the runner actually splits? A: The code that splits is in the runner-agnostic, so a runner would have to deliberately sabotage it in order to break it - unlikely. Also, for semantics we have runner-agnostic ROS tests; but at some point will need performance tests too. Next steps: - Amit will look at the standard SplittableParDo expansion and implementation in Flink and Direct runner, will write up a doc about how to do this in Spark. - Another videotalk in 2 weeks to check on progress/issues. Thanks all! On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <[email protected]> wrote: > Yes, Monday morning works! How about also 8am PST, same Hangout link - > does that work for you? > > On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <[email protected]> > wrote: > > Eugene, > > I cannot make it for the call today. Would Monday morning work for you to > discuss the Apex changes? > > Thanks > > On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov < > [email protected]> wrote: > > > Hi! Please feel free to join this call, but I think we'd be mostly > > discussing how to do it in the Spark runner in particular; so we'll > > probably need another call for Apex anyway. > > > > On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <[email protected]> wrote: > > > > > Hi Eugene, > > > > > > This would work for me also. Please let me know if you want to keep the > > > Apex related discussion separate or want me to join this call. > > > > > > Thanks, > > > Thomas > > > > > > > > > On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov < > > > [email protected]> wrote: > > > > > > > Sure, Friday morning sounds good. How about 9am Friday PST, at > > videocall > > > by > > > > link > https://hangouts.google.com/hangouts/_/google.com/splittabledofn > > ? > > > > > > > > On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <[email protected]> > > wrote: > > > > > > > > > PST mornings are better, because they are evening/nights for me. > > Friday > > > > > would work-out best for me. > > > > > > > > > > On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov > > > > > <[email protected]> wrote: > > > > > > > > > > > Awesome!!! > > > > > > > > > > > > Amit - remind me your time zone? JB, do you want to join? > > > > > > I'm free this week all afternoons (say after 2pm) in Pacific > Time, > > > and > > > > > > mornings of Wed & Fri. We'll probably need half an hour to an > hour. > > > > > > > > > > > > On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek < > > > [email protected]> > > > > > > wrote: > > > > > > > > > > > > > I whipped up a quick version for Flink that seems to work: > > > > > > > https://github.com/apache/beam/pull/2235 > > > > > > > > > > > > > > There are still two failing tests, as described in the PR. > > > > > > > > > > > > > > On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote: > > > > > > > > +1 for a video call. I think it should be pretty straight > > forward > > > > for > > > > > > the > > > > > > > > Spark runner after the work on read from UnboundedSource and > > > after > > > > > > > > GroupAlsoByWindow, but from my experience such a call could > > move > > > us > > > > > > > > forward > > > > > > > > fast enough. > > > > > > > > > > > > > > > > On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov < > > > [email protected] > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > Let us continue working on this. I am back from various > > travels > > > > and > > > > > > am > > > > > > > > > eager to help. > > > > > > > > > > > > > > > > > > Amit, JB - would you like to perhaps have a videocall to > hash > > > > this > > > > > > out > > > > > > > for > > > > > > > > > the Spark runner? > > > > > > > > > > > > > > > > > > Aljoscha - are the necessary Flink changes done / or is the > > > need > > > > > for > > > > > > > them > > > > > > > > > obviated by using the (existing) runner-facing state/timer > > > APIs? > > > > > > > Should we > > > > > > > > > have a videocall too? > > > > > > > > > > > > > > > > > > Thomas - what do you think about getting this into Apex > > runner? > > > > > > > > > > > > > > > > > > (I think videocalls will allow to make rapid progress, but > > it's > > > > > > > probably a > > > > > > > > > better idea to keep them separate since they'll involve a > lot > > > of > > > > > > > > > runner-specific details) > > > > > > > > > > > > > > > > > > PS - The completion of this in Dataflow streaming runner is > > > > > currently > > > > > > > > > waiting only on having a small service-side change > > implemented > > > > and > > > > > > > rolled > > > > > > > > > out for termination of streaming jobs. > > > > > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles < > > > [email protected]> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > I recommend proceeding with the runner-facing state & timer > > > APIs; > > > > > > they > > > > > > > are > > > > > > > > > lower-level and more appropriate for this. All runners > > provide > > > > them > > > > > > or > > > > > > > use > > > > > > > > > runners/core implementations, as they are needed for > > > triggering. > > > > > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov < > > > > > > > [email protected]> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Thanks Aljoscha! > > > > > > > > > > > > > > > > > > Minor note: I'm not familiar with what level of support for > > > > timers > > > > > > > Flink > > > > > > > > > currently has - however SDF in Direct and Dataflow runner > > > > currently > > > > > > > does > > > > > > > > > not use the user-facing state/timer APIs - rather, it uses > > the > > > > > > > > > runner-facing APIs (StateInternals and TimerInternals) - > > > perhaps > > > > > > Flink > > > > > > > > > already implements these. We may want to change this, but > for > > > now > > > > > > it's > > > > > > > good > > > > > > > > > enough (besides, SDF uses watermark holds, which are not > > > > supported > > > > > by > > > > > > > the > > > > > > > > > user-facing state API yet). > > > > > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek < > > > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > > > > Thanks for the motivation, Eugene! :-) > > > > > > > > > > > > > > > > > > I've wanted to do this for a while now but was waiting for > > the > > > > > Flink > > > > > > > 1.2 > > > > > > > > > release (which happened this week)! There's some > prerequisite > > > > work > > > > > to > > > > > > > be > > > > > > > > > done on the Flink runner: we'll move to the new timer > > > interfaces > > > > > > > introduced > > > > > > > > > in Flink 1.2 and implement support for both the user facing > > > state > > > > > and > > > > > > > timer > > > > > > > > > APIs. This should make implementation of SDF easier. > > > > > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov < > > > > > > [email protected] > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Thanks! Looking forward to this work. > > > > > > > > > > > > > > > > > > On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré < > > > > > [email protected] > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Thanks for the update Eugene. > > > > > > > > > > > > > > > > > > I will work on the spark runner with Amit. > > > > > > > > > > > > > > > > > > Regards > > > > > > > > > JB > > > > > > > > > > > > > > > > > > On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov > > > > > > > > > <[email protected]> wrote: > > > > > > > > > >Hello, > > > > > > > > > > > > > > > > > > > >I'm almost done adding support for Splittable DoFn > > > > > > > > > >http://s.apache.org/splittable-do-fn to Dataflow > streaming > > > > > runner*, > > > > > > > and > > > > > > > > > >very excited about that. There's only 1 PR > > > > > > > > > ><https://github.com/apache/beam/pull/1898> remaining, > plus > > > > > enabling > > > > > > > > > >some > > > > > > > > > >tests. > > > > > > > > > > > > > > > > > > > >* (batch runner is much harder because it's not yet quite > > > clear > > > > to > > > > > > me > > > > > > > > > >how > > > > > > > > > >to properly implement liquid sharding > > > > > > > > > >< > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cloud.google.com/blog/big-data/2016/05/no-shard- > > > > left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow > > > > > > > > > > > > > > > > > > > >with > > > > > > > > > >SDF - and the current API is not ready for that yet) > > > > > > > > > > > > > > > > > > > >After implementing all the runner-agnostic parts of > > Splittable > > > > > > DoFn, I > > > > > > > > > >found them quite easy to integrate into Dataflow streaming > > > > runner, > > > > > > and > > > > > > > > > >I > > > > > > > > > >think this means it should be easy to integrate into other > > > > runners > > > > > > > too. > > > > > > > > > > > > > > > > > > > >====== Why it'd be cool ====== > > > > > > > > > >The general benefits of SDF are well-described in the > design > > > doc > > > > > > > > > >(linked > > > > > > > > > >above). > > > > > > > > > >As for right now - if we integrated SDF with all runners, > > it'd > > > > > > already > > > > > > > > > >enable us to start greatly simplifying the code of > existing > > > > > > streaming > > > > > > > > > >connectors (CountingInput, Kafka, Pubsub, JMS) and writing > > new > > > > > > > > > >connectors > > > > > > > > > >(e.g. a really nice one to implement would be "directory > > > > watcher", > > > > > > > that > > > > > > > > > >continuously returns new files in a directory). > > > > > > > > > > > > > > > > > > > >As a teaser, here's the complete implementation of an > > > "unbounded > > > > > > > > > >counter" I > > > > > > > > > >used for my test of Dataflow runner integration: > > > > > > > > > > > > > > > > > > > > class CountFn extends DoFn<String, String> { > > > > > > > > > > @ProcessElement > > > > > > > > > >public ProcessContinuation process(ProcessContext c, > > > > > > > OffsetRangeTracker > > > > > > > > > >tracker) { > > > > > > > > > > for (int i = tracker.currentRestriction().getFrom(); > > > > > > > > > >tracker.tryClaim(i); ++i) c.output(i); > > > > > > > > > > return resume(); > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > @GetInitialRestriction > > > > > > > > > > public OffsetRange getInitialRange(String element) { > > > return > > > > > new > > > > > > > > > >OffsetRange(0, Integer.MAX_VALUE); } > > > > > > > > > > > > > > > > > > > > @NewTracker > > > > > > > > > > public OffsetRangeTracker newTracker(OffsetRange > range) { > > > > > return > > > > > > > new > > > > > > > > > >OffsetRangeTracker(range); } > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > >====== What I'm asking ====== > > > > > > > > > >So, I'd like to ask for help integrating SDF into Spark, > > Flink > > > > and > > > > > > > Apex > > > > > > > > > >runners from people who are intimately familiar with them > - > > > > > > > > > >specifically, I > > > > > > > > > >was hoping best-case I could nerd-snipe some of you into > > > taking > > > > > over > > > > > > > > > >the > > > > > > > > > >integration of SDF with your favorite runner ;) > > > > > > > > > > > > > > > > > > > >The proper set of people seems to be +Aljoscha Krettek > > > > > > > > > ><[email protected]> +Maximilian Michels > > > > > > > > > ><[email protected]> > > > > > > > > > >[email protected] <[email protected]> +Amit Sela > > > > > > > > > ><[email protected]> +Thomas > > > > > > > > > >Weise unless I forgot somebody. > > > > > > > > > > > > > > > > > > > >Average-case, I was looking for runner-specific guidance > on > > > how > > > > to > > > > > > do > > > > > > > > > >it > > > > > > > > > >myself. > > > > > > > > > > > > > > > > > > > >====== If you want to help ====== > > > > > > > > > >If somebody decides to take this over, in my absence (I'll > > be > > > > > mostly > > > > > > > > > >gone > > > > > > > > > >for ~the next month)., the best people to ask for > > > implementation > > > > > > > > > >advice are +Kenn > > > > > > > > > >Knowles <[email protected]> and +Daniel Mills < > > [email protected] > > > > > > > > . > > > > > > > > > > > > > > > > > > > >For reference, here's how SDF is implemented in the direct > > > > runner: > > > > > > > > > >- Direct runner overrides > > > > > > > > > >< > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b > > > > 74a62d9b24/runners/direct-java/src/main/java/org/apache/ > > > > beam/runners/direct/ParDoMultiOverrideFactory.java > > > > > > > > > > > > > > > > > > > > ParDo.of() for a splittable DoFn and replaces it with > > > > > > SplittableParDo > > > > > > > > > >< > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/master/runners/core- > > > > java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java > > > > > > > > > > > > > > > > > > > >(common > > > > > > > > > >transform expansion) > > > > > > > > > >- SplittableParDo uses two runner-specific primitive > > > transforms: > > > > > > > > > >"GBKIntoKeyedWorkItems" and "SplittableProcessElements". > > > Direct > > > > > > runner > > > > > > > > > >overrides the first one like this > > > > > > > > > >< > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > > > 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > > > beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java > > > > > > > > > >, > > > > > > > > > >and directly implements evaluation of the second one like > > this > > > > > > > > > >< > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > > > 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > > > beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java > > > > > > > > > >, > > > > > > > > > >using runner hooks introduced in this PR > > > > > > > > > ><https://github.com/apache/beam/pull/1824>. At the core > of > > > the > > > > > > hooks > > > > > > > is > > > > > > > > > >"ProcessFn" which is like a regular DoFn but has to be > > > prepared > > > > at > > > > > > > > > >runtime > > > > > > > > > >with some hooks (state, timers, and runner access to > > > > > > > > > >RestrictionTracker) > > > > > > > > > >before you invoke it. I added a convenience implementation > > of > > > > the > > > > > > hook > > > > > > > > > >mimicking behavior of UnboundedSource. > > > > > > > > > >- The relevant runner-agnostic tests are in > > SplittableDoFnTest > > > > > > > > > >< > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > > > 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/ > > > > transforms/SplittableDoFnTest.java > > > > > > > > > > > > > > > > > > > >. > > > > > > > > > > > > > > > > > > > >That's all it takes, really - the runner has to implement > > > these > > > > > two > > > > > > > > > >transforms. When I looked at Spark and Flink runners, it > was > > > not > > > > > > quite > > > > > > > > > >clear to me how to implement the GBKIntoKeyedWorkItems > > > > transform, > > > > > > e.g. > > > > > > > > > >Spark runner currently doesn't use KeyedWorkItem at all - > > but > > > it > > > > > > seems > > > > > > > > > >definitely possible. > > > > > > > > > > > > > > > > > > > >Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin > > > > > > > > > > > > > > > > > > [email protected] > > > > > > > > > +49-(0)30-55599146 <+49%2030%2055599146> > <+49%2030%2055599146> > > > <+49%2030%2055599146> <+49%2030%2055599146> > > > > > <+49%2030%2055599146> > > > > > > > > > > > > > > > > > > Registered at Amtsgericht Charlottenburg - HRB 158244 B > > > > > > > > > Managing Directors: Kostas Tzoumas, Stephan Ewen > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
