Thomas - yes, 9:30 works, shall we do that? JB - excellent! You can start experimenting already, using direct runner!
On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Eugene, > > Thanks for the meeting notes ! > > I will be in the next call and Ismaël also provided to me some updates. > > I will sync with Amit on Spark runner and start to experiment and test SDF > on > the JMS IO. > > Thanks ! > Regards > JB > > On 03/17/2017 04:36 PM, Eugene Kirpichov wrote: > > Meeting notes from today's call with Amit, Aviem and Ismaël: > > > > Spark has 2 types of stateful operators; a cheap one intended for > updating > > elements (works with state but not with timers) and an expensive one. > I.e. > > there's no efficient direct counterpart to Beam's keyed state model. In > > implementation of Beam State & Timers API, Spark runner will use the > > cheaper one for state and the expensive one for timers. So, for SDF, > which > > in the runner-agnostic SplittableParDo expansion needs both state and > > timers, we'll need the expensive one - but this should be fine since with > > SDF the bottleneck should be in the ProcessElement call itself, not in > > splitting/scheduling it. > > > > For Spark batch runner, implementing SDF might be still simpler: runner > > will just not request any checkpointing. Hard parts about SDF/batch are > > dynamic rebalancing and size estimation APIs - they will be refined this > > quarter, but it's ok to initially not have them. > > > > Spark runner might use a different expansion of SDF not involving > > KeyedWorkItem's (i.e. not overriding the GBKIntoKeyedWorkItems > transform), > > though still striving to reuse as much code as possible from the standard > > expansion implemented in SplittableParDo, at least ProcessFn. > > > > Testing questions: > > - Spark runner already implements termination on > > watermarks-reaching-infinity properly. > > - Q: How to test that the runner actually splits? A: The code that splits > > is in the runner-agnostic, so a runner would have to deliberately > sabotage > > it in order to break it - unlikely. Also, for semantics we have > > runner-agnostic ROS tests; but at some point will need performance tests > > too. > > > > Next steps: > > - Amit will look at the standard SplittableParDo expansion and > > implementation in Flink and Direct runner, will write up a doc about how > to > > do this in Spark. > > - Another videotalk in 2 weeks to check on progress/issues. > > > > Thanks all! > > > > On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <kirpic...@google.com> > > wrote: > > > >> Yes, Monday morning works! How about also 8am PST, same Hangout link - > >> does that work for you? > >> > >> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <thomas.we...@gmail.com> > >> wrote: > >> > >> Eugene, > >> > >> I cannot make it for the call today. Would Monday morning work for you > to > >> discuss the Apex changes? > >> > >> Thanks > >> > >> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov < > >> kirpic...@google.com.invalid> wrote: > >> > >>> Hi! Please feel free to join this call, but I think we'd be mostly > >>> discussing how to do it in the Spark runner in particular; so we'll > >>> probably need another call for Apex anyway. > >>> > >>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <t...@apache.org> wrote: > >>> > >>>> Hi Eugene, > >>>> > >>>> This would work for me also. Please let me know if you want to keep > the > >>>> Apex related discussion separate or want me to join this call. > >>>> > >>>> Thanks, > >>>> Thomas > >>>> > >>>> > >>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov < > >>>> kirpic...@google.com.invalid> wrote: > >>>> > >>>>> Sure, Friday morning sounds good. How about 9am Friday PST, at > >>> videocall > >>>> by > >>>>> link > >> https://hangouts.google.com/hangouts/_/google.com/splittabledofn > >>> ? > >>>>> > >>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <amitsel...@gmail.com> > >>> wrote: > >>>>> > >>>>>> PST mornings are better, because they are evening/nights for me. > >>> Friday > >>>>>> would work-out best for me. > >>>>>> > >>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov > >>>>>> <kirpic...@google.com.invalid> wrote: > >>>>>> > >>>>>>> Awesome!!! > >>>>>>> > >>>>>>> Amit - remind me your time zone? JB, do you want to join? > >>>>>>> I'm free this week all afternoons (say after 2pm) in Pacific > >> Time, > >>>> and > >>>>>>> mornings of Wed & Fri. We'll probably need half an hour to an > >> hour. > >>>>>>> > >>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek < > >>>> aljos...@apache.org> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> I whipped up a quick version for Flink that seems to work: > >>>>>>>> https://github.com/apache/beam/pull/2235 > >>>>>>>> > >>>>>>>> There are still two failing tests, as described in the PR. > >>>>>>>> > >>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote: > >>>>>>>>> +1 for a video call. I think it should be pretty straight > >>> forward > >>>>> for > >>>>>>> the > >>>>>>>>> Spark runner after the work on read from UnboundedSource and > >>>> after > >>>>>>>>> GroupAlsoByWindow, but from my experience such a call could > >>> move > >>>> us > >>>>>>>>> forward > >>>>>>>>> fast enough. > >>>>>>>>> > >>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov < > >>>> kirpic...@google.com > >>>>>> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> Let us continue working on this. I am back from various > >>> travels > >>>>> and > >>>>>>> am > >>>>>>>>>> eager to help. > >>>>>>>>>> > >>>>>>>>>> Amit, JB - would you like to perhaps have a videocall to > >> hash > >>>>> this > >>>>>>> out > >>>>>>>> for > >>>>>>>>>> the Spark runner? > >>>>>>>>>> > >>>>>>>>>> Aljoscha - are the necessary Flink changes done / or is the > >>>> need > >>>>>> for > >>>>>>>> them > >>>>>>>>>> obviated by using the (existing) runner-facing state/timer > >>>> APIs? > >>>>>>>> Should we > >>>>>>>>>> have a videocall too? > >>>>>>>>>> > >>>>>>>>>> Thomas - what do you think about getting this into Apex > >>> runner? > >>>>>>>>>> > >>>>>>>>>> (I think videocalls will allow to make rapid progress, but > >>> it's > >>>>>>>> probably a > >>>>>>>>>> better idea to keep them separate since they'll involve a > >> lot > >>>> of > >>>>>>>>>> runner-specific details) > >>>>>>>>>> > >>>>>>>>>> PS - The completion of this in Dataflow streaming runner is > >>>>>> currently > >>>>>>>>>> waiting only on having a small service-side change > >>> implemented > >>>>> and > >>>>>>>> rolled > >>>>>>>>>> out for termination of streaming jobs. > >>>>>>>>>> > >>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles < > >>>> k...@google.com> > >>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> I recommend proceeding with the runner-facing state & timer > >>>> APIs; > >>>>>>> they > >>>>>>>> are > >>>>>>>>>> lower-level and more appropriate for this. All runners > >>> provide > >>>>> them > >>>>>>> or > >>>>>>>> use > >>>>>>>>>> runners/core implementations, as they are needed for > >>>> triggering. > >>>>>>>>>> > >>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov < > >>>>>>>> kirpic...@google.com> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Thanks Aljoscha! > >>>>>>>>>> > >>>>>>>>>> Minor note: I'm not familiar with what level of support for > >>>>> timers > >>>>>>>> Flink > >>>>>>>>>> currently has - however SDF in Direct and Dataflow runner > >>>>> currently > >>>>>>>> does > >>>>>>>>>> not use the user-facing state/timer APIs - rather, it uses > >>> the > >>>>>>>>>> runner-facing APIs (StateInternals and TimerInternals) - > >>>> perhaps > >>>>>>> Flink > >>>>>>>>>> already implements these. We may want to change this, but > >> for > >>>> now > >>>>>>> it's > >>>>>>>> good > >>>>>>>>>> enough (besides, SDF uses watermark holds, which are not > >>>>> supported > >>>>>> by > >>>>>>>> the > >>>>>>>>>> user-facing state API yet). > >>>>>>>>>> > >>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek < > >>>>>>>>>> aljos...@data-artisans.com> wrote: > >>>>>>>>>> > >>>>>>>>>> Thanks for the motivation, Eugene! :-) > >>>>>>>>>> > >>>>>>>>>> I've wanted to do this for a while now but was waiting for > >>> the > >>>>>> Flink > >>>>>>>> 1.2 > >>>>>>>>>> release (which happened this week)! There's some > >> prerequisite > >>>>> work > >>>>>> to > >>>>>>>> be > >>>>>>>>>> done on the Flink runner: we'll move to the new timer > >>>> interfaces > >>>>>>>> introduced > >>>>>>>>>> in Flink 1.2 and implement support for both the user facing > >>>> state > >>>>>> and > >>>>>>>> timer > >>>>>>>>>> APIs. This should make implementation of SDF easier. > >>>>>>>>>> > >>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov < > >>>>>>> kirpic...@google.com > >>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Thanks! Looking forward to this work. > >>>>>>>>>> > >>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré < > >>>>>> j...@nanthrax.net > >>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Thanks for the update Eugene. > >>>>>>>>>> > >>>>>>>>>> I will work on the spark runner with Amit. > >>>>>>>>>> > >>>>>>>>>> Regards > >>>>>>>>>> JB > >>>>>>>>>> > >>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov > >>>>>>>>>> <kirpic...@google.com.INVALID> wrote: > >>>>>>>>>>> Hello, > >>>>>>>>>>> > >>>>>>>>>>> I'm almost done adding support for Splittable DoFn > >>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow > >> streaming > >>>>>> runner*, > >>>>>>>> and > >>>>>>>>>>> very excited about that. There's only 1 PR > >>>>>>>>>>> <https://github.com/apache/beam/pull/1898> remaining, > >> plus > >>>>>> enabling > >>>>>>>>>>> some > >>>>>>>>>>> tests. > >>>>>>>>>>> > >>>>>>>>>>> * (batch runner is much harder because it's not yet quite > >>>> clear > >>>>> to > >>>>>>> me > >>>>>>>>>>> how > >>>>>>>>>>> to properly implement liquid sharding > >>>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> https://cloud.google.com/blog/big-data/2016/05/no-shard- > >>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow > >>>>>>>>>>> > >>>>>>>>>>> with > >>>>>>>>>>> SDF - and the current API is not ready for that yet) > >>>>>>>>>>> > >>>>>>>>>>> After implementing all the runner-agnostic parts of > >>> Splittable > >>>>>>> DoFn, I > >>>>>>>>>>> found them quite easy to integrate into Dataflow streaming > >>>>> runner, > >>>>>>> and > >>>>>>>>>>> I > >>>>>>>>>>> think this means it should be easy to integrate into other > >>>>> runners > >>>>>>>> too. > >>>>>>>>>>> > >>>>>>>>>>> ====== Why it'd be cool ====== > >>>>>>>>>>> The general benefits of SDF are well-described in the > >> design > >>>> doc > >>>>>>>>>>> (linked > >>>>>>>>>>> above). > >>>>>>>>>>> As for right now - if we integrated SDF with all runners, > >>> it'd > >>>>>>> already > >>>>>>>>>>> enable us to start greatly simplifying the code of > >> existing > >>>>>>> streaming > >>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and writing > >>> new > >>>>>>>>>>> connectors > >>>>>>>>>>> (e.g. a really nice one to implement would be "directory > >>>>> watcher", > >>>>>>>> that > >>>>>>>>>>> continuously returns new files in a directory). > >>>>>>>>>>> > >>>>>>>>>>> As a teaser, here's the complete implementation of an > >>>> "unbounded > >>>>>>>>>>> counter" I > >>>>>>>>>>> used for my test of Dataflow runner integration: > >>>>>>>>>>> > >>>>>>>>>>> class CountFn extends DoFn<String, String> { > >>>>>>>>>>> @ProcessElement > >>>>>>>>>>> public ProcessContinuation process(ProcessContext c, > >>>>>>>> OffsetRangeTracker > >>>>>>>>>>> tracker) { > >>>>>>>>>>> for (int i = tracker.currentRestriction().getFrom(); > >>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i); > >>>>>>>>>>> return resume(); > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>>> @GetInitialRestriction > >>>>>>>>>>> public OffsetRange getInitialRange(String element) { > >>>> return > >>>>>> new > >>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); } > >>>>>>>>>>> > >>>>>>>>>>> @NewTracker > >>>>>>>>>>> public OffsetRangeTracker newTracker(OffsetRange > >> range) { > >>>>>> return > >>>>>>>> new > >>>>>>>>>>> OffsetRangeTracker(range); } > >>>>>>>>>>> } > >>>>>>>>>>> > >>>>>>>>>>> ====== What I'm asking ====== > >>>>>>>>>>> So, I'd like to ask for help integrating SDF into Spark, > >>> Flink > >>>>> and > >>>>>>>> Apex > >>>>>>>>>>> runners from people who are intimately familiar with them > >> - > >>>>>>>>>>> specifically, I > >>>>>>>>>>> was hoping best-case I could nerd-snipe some of you into > >>>> taking > >>>>>> over > >>>>>>>>>>> the > >>>>>>>>>>> integration of SDF with your favorite runner ;) > >>>>>>>>>>> > >>>>>>>>>>> The proper set of people seems to be +Aljoscha Krettek > >>>>>>>>>>> <aljos...@data-artisans.com> +Maximilian Michels > >>>>>>>>>>> <m...@data-artisans.com> > >>>>>>>>>>> +ieme...@gmail.com <ieme...@gmail.com> +Amit Sela > >>>>>>>>>>> <amitsel...@gmail.com> +Thomas > >>>>>>>>>>> Weise unless I forgot somebody. > >>>>>>>>>>> > >>>>>>>>>>> Average-case, I was looking for runner-specific guidance > >> on > >>>> how > >>>>> to > >>>>>>> do > >>>>>>>>>>> it > >>>>>>>>>>> myself. > >>>>>>>>>>> > >>>>>>>>>>> ====== If you want to help ====== > >>>>>>>>>>> If somebody decides to take this over, in my absence (I'll > >>> be > >>>>>> mostly > >>>>>>>>>>> gone > >>>>>>>>>>> for ~the next month)., the best people to ask for > >>>> implementation > >>>>>>>>>>> advice are +Kenn > >>>>>>>>>>> Knowles <k...@google.com> and +Daniel Mills < > >>> mil...@google.com > >>>>> > >>>>> . > >>>>>>>>>>> > >>>>>>>>>>> For reference, here's how SDF is implemented in the direct > >>>>> runner: > >>>>>>>>>>> - Direct runner overrides > >>>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b > >>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/ > >>>>> beam/runners/direct/ParDoMultiOverrideFactory.java > >>>>>>>>>>> > >>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it with > >>>>>>> SplittableParDo > >>>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> https://github.com/apache/beam/blob/master/runners/core- > >>>>> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java > >>>>>>>>>>> > >>>>>>>>>>> (common > >>>>>>>>>>> transform expansion) > >>>>>>>>>>> - SplittableParDo uses two runner-specific primitive > >>>> transforms: > >>>>>>>>>>> "GBKIntoKeyedWorkItems" and "SplittableProcessElements". > >>>> Direct > >>>>>>> runner > >>>>>>>>>>> overrides the first one like this > >>>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > >>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > >>>>> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java > >>>>>>>>>>> , > >>>>>>>>>>> and directly implements evaluation of the second one like > >>> this > >>>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > >>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > >>>>> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java > >>>>>>>>>>> , > >>>>>>>>>>> using runner hooks introduced in this PR > >>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the core > >> of > >>>> the > >>>>>>> hooks > >>>>>>>> is > >>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to be > >>>> prepared > >>>>> at > >>>>>>>>>>> runtime > >>>>>>>>>>> with some hooks (state, timers, and runner access to > >>>>>>>>>>> RestrictionTracker) > >>>>>>>>>>> before you invoke it. I added a convenience implementation > >>> of > >>>>> the > >>>>>>> hook > >>>>>>>>>>> mimicking behavior of UnboundedSource. > >>>>>>>>>>> - The relevant runner-agnostic tests are in > >>> SplittableDoFnTest > >>>>>>>>>>> < > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > >>>>> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/ > >>>>> transforms/SplittableDoFnTest.java > >>>>>>>>>>> > >>>>>>>>>>> . > >>>>>>>>>>> > >>>>>>>>>>> That's all it takes, really - the runner has to implement > >>>> these > >>>>>> two > >>>>>>>>>>> transforms. When I looked at Spark and Flink runners, it > >> was > >>>> not > >>>>>>> quite > >>>>>>>>>>> clear to me how to implement the GBKIntoKeyedWorkItems > >>>>> transform, > >>>>>>> e.g. > >>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at all - > >>> but > >>>> it > >>>>>>> seems > >>>>>>>>>>> definitely possible. > >>>>>>>>>>> > >>>>>>>>>>> Thanks! > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin > >>>>>>>>>> > >>>>>>>>>> i...@data-artisans.com > >>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146> <+49%2030%2055599146> > >> <+49%2030%2055599146> > >>>> <+49%2030%2055599146> <+49%2030%2055599146> > >>>>>> <+49%2030%2055599146> > >>>>>>>>>> > >>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB 158244 B > >>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >>> > >> > >> > > > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >