Hey all, The Flink PR has been merged, and thus - Flink becomes the first distributed runner to support Splittable DoFn!!! Thank you, Aljoscha!
Looking forward to Spark and Apex, and continuing work on Dataflow. I'll also send proposals about a couple of new ideas related to SDF next week. On Thu, Mar 30, 2017 at 9:08 AM Amit Sela <[email protected]> wrote: > I will not be able to make it this weekend, too busy. Let's chat at the > beginning of next week and see what's on my plate. > > On Tue, Mar 28, 2017 at 5:44 PM Aljoscha Krettek <[email protected]> > wrote: > > > Thanks for the offers, guys! The code is finished, though. I only need > > to do the last touch ups. > > > > On Tue, Mar 28, 2017, at 09:16, JingsongLee wrote: > > > Hi Aljoscha, > > > I would like to work on the Flink runner with you. > > > > > > Best,JingsongLee------------------------------------------------------------------From:Jean-Baptiste > > > Onofré <[email protected]>Time:2017 Mar 28 (Tue) 14:04To:dev > > > <[email protected]>Subject:Re: Call for help: let's add Splittable > > DoFn > > > to Spark, Flink and Apex runners > > > Hi Aljoscha, > > > > > > do you need some help on this ? > > > > > > Regards > > > JB > > > > > > On 03/28/2017 08:00 AM, Aljoscha Krettek wrote: > > > > Hi, > > > > sorry for being so slow but I’m currently traveling. > > > > > > > > The Flink code works but I think it could benefit from some > refactoring > > > > to make the code nice and maintainable. > > > > > > > > Best, > > > > Aljoscha > > > > > > > > On Tue, Mar 28, 2017, at 07:40, Jean-Baptiste Onofré wrote: > > > >> I add myself on the Spark runner. > > > >> > > > >> Regards > > > >> JB > > > >> > > > >> On 03/27/2017 08:18 PM, Eugene Kirpichov wrote: > > > >>> Hi all, > > > >>> > > > >>> Let's continue the ~bi-weekly sync-ups about state of SDF support > in > > > >>> Spark/Flink/Apex runners. > > > >>> > > > >>> Spark: > > > > > >>> Amit, Aviem, Ismaël - when would be a good time for you; does same > time > > > >>> work (8am PST this Friday)? Who else would like to join? > > > >>> > > > >>> Flink: > > > >>> I pinged the PR, but - Aljoscha, do you think it's worth discussing > > > >>> anything there over a videocall? > > > >>> > > > >>> Apex: > > > > > >>> Thomas - how about same time next Monday? (9:30am PST) Who else > would like > > > >>> to join? > > > >>> > > > >>> On Mon, Mar 20, 2017 at 9:59 AM Eugene Kirpichov < > > [email protected]> > > > >>> wrote: > > > >>> > > > >>>> Meeting notes: > > > >>>> Me and Thomas had a video call and we pretty much walked through > the > > > > > >>>> implementation of SDF in the runner-agnostic part and in the direct > runner. > > > >>>> Flink and Apex are pretty similar, so likely > > > >>>> https://github.com/apache/beam/pull/2235 > > (the Flink PR) will give a very > > > >>>> good guideline as to how to do this in Apex. > > > >>>> Will talk again in ~2 weeks; and will involve +David Yan > > > >>>> <[email protected] > > > who is also on Apex and currently conveniently > > > > > >>>> works on the Google Dataflow team and, from in-person conversation, > was > > > >>>> interested in being involved :) > > > >>>> > > > >>>> On Mon, Mar 20, 2017 at 7:34 AM Eugene Kirpichov < > > [email protected]> > > > >>>> wrote: > > > >>>> > > > >>>> Thomas - yes, 9:30 works, shall we do that? > > > >>>> > > > > > >>>> JB - excellent! You can start experimenting already, using direct > runner! > > > >>>> > > > >>>> On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré < > [email protected] > > > > > > >>>> wrote: > > > >>>> > > > >>>> Hi Eugene, > > > >>>> > > > >>>> Thanks for the meeting notes ! > > > >>>> > > > > > >>>> I will be in the next call and Ismaël also provided to me some > updates. > > > >>>> > > > > > >>>> I will sync with Amit on Spark runner and start to experiment and > test SDF > > > >>>> on > > > >>>> the JMS IO. > > > >>>> > > > >>>> Thanks ! > > > >>>> Regards > > > >>>> JB > > > >>>> > > > >>>> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote: > > > >>>>> Meeting notes from today's call with Amit, Aviem and Ismaël: > > > >>>>> > > > >>>>> Spark has 2 types of stateful operators; a cheap one intended for > > > >>>> updating > > > > > >>>>> elements (works with state but not with timers) and an expensive > one. > > > >>>> I.e. > > > > > >>>>> there's no efficient direct counterpart to Beam's keyed state > model. In > > > > > >>>>> implementation of Beam State & Timers API, Spark runner will use > the > > > > > >>>>> cheaper one for state and the expensive one for timers. So, for > SDF, > > > >>>> which > > > > > >>>>> in the runner-agnostic SplittableParDo expansion needs both state > and > > > > > >>>>> timers, we'll need the expensive one - but this should be fine > since with > > > > > >>>>> SDF the bottleneck should be in the ProcessElement call itself, > not in > > > >>>>> splitting/scheduling it. > > > >>>>> > > > > > >>>>> For Spark batch runner, implementing SDF might be still simpler: > runner > > > > > >>>>> will just not request any checkpointing. Hard parts about > SDF/batch are > > > > > >>>>> dynamic rebalancing and size estimation APIs - they will be > refined this > > > >>>>> quarter, but it's ok to initially not have them. > > > >>>>> > > > >>>>> Spark runner might use a different expansion of SDF not involving > > > >>>>> KeyedWorkItem's (i.e. not overriding the GBKIntoKeyedWorkItems > > > >>>> transform), > > > > > >>>>> though still striving to reuse as much code as possible from the > standard > > > >>>>> expansion implemented in SplittableParDo, at least ProcessFn. > > > >>>>> > > > >>>>> Testing questions: > > > >>>>> - Spark runner already implements termination on > > > >>>>> watermarks-reaching-infinity properly. > > > > > >>>>> - Q: How to test that the runner actually splits? A: The code that > splits > > > >>>>> is in the runner-agnostic, so a runner would have to deliberately > > > >>>> sabotage > > > >>>>> it in order to break it - unlikely. Also, for semantics we have > > > > > >>>>> runner-agnostic ROS tests; but at some point will need performance > tests > > > >>>>> too. > > > >>>>> > > > >>>>> Next steps: > > > >>>>> - Amit will look at the standard SplittableParDo expansion and > > > > > >>>>> implementation in Flink and Direct runner, will write up a doc > about how > > > >>>> to > > > >>>>> do this in Spark. > > > >>>>> - Another videotalk in 2 weeks to check on progress/issues. > > > >>>>> > > > >>>>> Thanks all! > > > >>>>> > > > >>>>> On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov < > > [email protected]> > > > >>>>> wrote: > > > >>>>> > > > > > >>>>>> Yes, Monday morning works! How about also 8am PST, same Hangout > link - > > > >>>>>> does that work for you? > > > >>>>>> > > > >>>>>> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise < > > [email protected]> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>> Eugene, > > > >>>>>> > > > > > >>>>>> I cannot make it for the call today. Would Monday morning work > for you > > > >>>> to > > > >>>>>> discuss the Apex changes? > > > >>>>>> > > > >>>>>> Thanks > > > >>>>>> > > > >>>>>> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov < > > > >>>>>> [email protected]> wrote: > > > >>>>>> > > > > > >>>>>>> Hi! Please feel free to join this call, but I think we'd be > mostly > > > > > >>>>>>> discussing how to do it in the Spark runner in particular; so > we'll > > > >>>>>>> probably need another call for Apex anyway. > > > >>>>>>> > > > >>>>>>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <[email protected] > > > wrote: > > > >>>>>>> > > > >>>>>>>> Hi Eugene, > > > >>>>>>>> > > > > > >>>>>>>> This would work for me also. Please let me know if you want to > keep > > > >>>> the > > > >>>>>>>> Apex related discussion separate or want me to join this call. > > > >>>>>>>> > > > >>>>>>>> Thanks, > > > >>>>>>>> Thomas > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov < > > > >>>>>>>> [email protected]> wrote: > > > >>>>>>>> > > > >>>>>>>>> Sure, Friday morning sounds good. How about 9am Friday PST, > at > > > >>>>>>> videocall > > > >>>>>>>> by > > > >>>>>>>>> link > > > >>>>>> > https://hangouts.google.com/hangouts/_/google.com/splittabledofn > > > >>>>>>> ? > > > >>>>>>>>> > > > >>>>>>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela < > > [email protected]> > > > >>>>>>> wrote: > > > >>>>>>>>> > > > > > >>>>>>>>>> PST mornings are better, because they are evening/nights for > me. > > > >>>>>>> Friday > > > >>>>>>>>>> would work-out best for me. > > > >>>>>>>>>> > > > >>>>>>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov > > > >>>>>>>>>> <[email protected]> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Awesome!!! > > > >>>>>>>>>>> > > > >>>>>>>>>>> Amit - remind me your time zone? JB, do you want to join? > > > >>>>>>>>>>> I'm free this week all afternoons (say after 2pm) in > Pacific > > > >>>>>> Time, > > > >>>>>>>> and > > > >>>>>>>>>>> mornings of Wed & Fri. We'll probably need half an hour to > an > > > >>>>>> hour. > > > >>>>>>>>>>> > > > >>>>>>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek < > > > >>>>>>>> [email protected]> > > > >>>>>>>>>>> wrote: > > > >>>>>>>>>>> > > > >>>>>>>>>>>> I whipped up a quick version for Flink that seems to work: > > > >>>>>>>>>>>> https://github.com/apache/beam/pull/2235 > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> There are still two failing tests, as described in the PR. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote: > > > >>>>>>>>>>>>> +1 for a video call. I think it should be pretty straight > > > >>>>>>> forward > > > >>>>>>>>> for > > > >>>>>>>>>>> the > > > > > >>>>>>>>>>>>> Spark runner after the work on read from UnboundedSource > and > > > >>>>>>>> after > > > >>>>>>>>>>>>> GroupAlsoByWindow, but from my experience such a call > could > > > >>>>>>> move > > > >>>>>>>> us > > > >>>>>>>>>>>>> forward > > > >>>>>>>>>>>>> fast enough. > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov < > > > >>>>>>>> [email protected] > > > >>>>>>>>>> > > > >>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Hi all, > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Let us continue working on this. I am back from various > > > >>>>>>> travels > > > >>>>>>>>> and > > > >>>>>>>>>>> am > > > >>>>>>>>>>>>>> eager to help. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Amit, JB - would you like to perhaps have a videocall to > > > >>>>>> hash > > > >>>>>>>>> this > > > >>>>>>>>>>> out > > > >>>>>>>>>>>> for > > > >>>>>>>>>>>>>> the Spark runner? > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Aljoscha - are the necessary Flink changes done / or is > the > > > >>>>>>>> need > > > >>>>>>>>>> for > > > >>>>>>>>>>>> them > > > >>>>>>>>>>>>>> obviated by using the (existing) runner-facing > state/timer > > > >>>>>>>> APIs? > > > >>>>>>>>>>>> Should we > > > >>>>>>>>>>>>>> have a videocall too? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thomas - what do you think about getting this into Apex > > > >>>>>>> runner? > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> (I think videocalls will allow to make rapid progress, > but > > > >>>>>>> it's > > > >>>>>>>>>>>> probably a > > > >>>>>>>>>>>>>> better idea to keep them separate since they'll involve > a > > > >>>>>> lot > > > >>>>>>>> of > > > >>>>>>>>>>>>>> runner-specific details) > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> PS - The completion of this in Dataflow streaming runner > is > > > >>>>>>>>>> currently > > > >>>>>>>>>>>>>> waiting only on having a small service-side change > > > >>>>>>> implemented > > > >>>>>>>>> and > > > >>>>>>>>>>>> rolled > > > >>>>>>>>>>>>>> out for termination of streaming jobs. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles < > > > >>>>>>>> [email protected]> > > > >>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> I recommend proceeding with the runner-facing state & > timer > > > >>>>>>>> APIs; > > > >>>>>>>>>>> they > > > >>>>>>>>>>>> are > > > >>>>>>>>>>>>>> lower-level and more appropriate for this. All runners > > > >>>>>>> provide > > > >>>>>>>>> them > > > >>>>>>>>>>> or > > > >>>>>>>>>>>> use > > > >>>>>>>>>>>>>> runners/core implementations, as they are needed for > > > >>>>>>>> triggering. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov < > > > >>>>>>>>>>>> [email protected]> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks Aljoscha! > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Minor note: I'm not familiar with what level of support > for > > > >>>>>>>>> timers > > > >>>>>>>>>>>> Flink > > > >>>>>>>>>>>>>> currently has - however SDF in Direct and Dataflow > runner > > > >>>>>>>>> currently > > > >>>>>>>>>>>> does > > > >>>>>>>>>>>>>> not use the user-facing state/timer APIs - rather, it > uses > > > >>>>>>> the > > > >>>>>>>>>>>>>> runner-facing APIs (StateInternals and TimerInternals) - > > > >>>>>>>> perhaps > > > >>>>>>>>>>> Flink > > > >>>>>>>>>>>>>> already implements these. We may want to change this, > but > > > >>>>>> for > > > >>>>>>>> now > > > >>>>>>>>>>> it's > > > >>>>>>>>>>>> good > > > >>>>>>>>>>>>>> enough (besides, SDF uses watermark holds, which are not > > > >>>>>>>>> supported > > > >>>>>>>>>> by > > > >>>>>>>>>>>> the > > > >>>>>>>>>>>>>> user-facing state API yet). > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek < > > > >>>>>>>>>>>>>> [email protected]> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks for the motivation, Eugene! :-) > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I've wanted to do this for a while now but was waiting > for > > > >>>>>>> the > > > >>>>>>>>>> Flink > > > >>>>>>>>>>>> 1.2 > > > >>>>>>>>>>>>>> release (which happened this week)! There's some > > > >>>>>> prerequisite > > > >>>>>>>>> work > > > >>>>>>>>>> to > > > >>>>>>>>>>>> be > > > >>>>>>>>>>>>>> done on the Flink runner: we'll move to the new timer > > > >>>>>>>> interfaces > > > >>>>>>>>>>>> introduced > > > > > >>>>>>>>>>>>>> in Flink 1.2 and implement support for both the user > facing > > > >>>>>>>> state > > > >>>>>>>>>> and > > > >>>>>>>>>>>> timer > > > >>>>>>>>>>>>>> APIs. This should make implementation of SDF easier. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov < > > > >>>>>>>>>>> [email protected] > > > >>>>>>>>>>>>> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks! Looking forward to this work. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré < > > > >>>>>>>>>> [email protected] > > > >>>>>>>>>>>> > > > >>>>>>>>>>>>>> wrote: > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Thanks for the update Eugene. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> I will work on the spark runner with Amit. > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Regards > > > >>>>>>>>>>>>>> JB > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov > > > >>>>>>>>>>>>>> <[email protected]> wrote: > > > >>>>>>>>>>>>>>> Hello, > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> I'm almost done adding support for Splittable DoFn > > > >>>>>>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow > > > >>>>>> streaming > > > >>>>>>>>>> runner*, > > > >>>>>>>>>>>> and > > > >>>>>>>>>>>>>>> very excited about that. There's only 1 PR > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1898> remaining, > > > >>>>>> plus > > > >>>>>>>>>> enabling > > > >>>>>>>>>>>>>>> some > > > >>>>>>>>>>>>>>> tests. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> * (batch runner is much harder because it's not yet > quite > > > >>>>>>>> clear > > > >>>>>>>>> to > > > >>>>>>>>>>> me > > > >>>>>>>>>>>>>>> how > > > >>>>>>>>>>>>>>> to properly implement liquid sharding > > > >>>>>>>>>>>>>>> < > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> https://cloud.google.com/blog/big-data/2016/05/no-shard- > > > >>>>>>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> with > > > >>>>>>>>>>>>>>> SDF - and the current API is not ready for that yet) > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> After implementing all the runner-agnostic parts of > > > >>>>>>> Splittable > > > >>>>>>>>>>> DoFn, I > > > > > >>>>>>>>>>>>>>> found them quite easy to integrate into Dataflow > streaming > > > >>>>>>>>> runner, > > > >>>>>>>>>>> and > > > >>>>>>>>>>>>>>> I > > > > > >>>>>>>>>>>>>>> think this means it should be easy to integrate into > other > > > >>>>>>>>> runners > > > >>>>>>>>>>>> too. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> ====== Why it'd be cool ====== > > > >>>>>>>>>>>>>>> The general benefits of SDF are well-described in the > > > >>>>>> design > > > >>>>>>>> doc > > > >>>>>>>>>>>>>>> (linked > > > >>>>>>>>>>>>>>> above). > > > >>>>>>>>>>>>>>> As for right now - if we integrated SDF with all > runners, > > > >>>>>>> it'd > > > >>>>>>>>>>> already > > > >>>>>>>>>>>>>>> enable us to start greatly simplifying the code of > > > >>>>>> existing > > > >>>>>>>>>>> streaming > > > > > >>>>>>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and > writing > > > >>>>>>> new > > > >>>>>>>>>>>>>>> connectors > > > >>>>>>>>>>>>>>> (e.g. a really nice one to implement would be > "directory > > > >>>>>>>>> watcher", > > > >>>>>>>>>>>> that > > > >>>>>>>>>>>>>>> continuously returns new files in a directory). > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> As a teaser, here's the complete implementation of an > > > >>>>>>>> "unbounded > > > >>>>>>>>>>>>>>> counter" I > > > >>>>>>>>>>>>>>> used for my test of Dataflow runner integration: > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> class CountFn extends DoFn<String, String> { > > > >>>>>>>>>>>>>>> @ProcessElement > > > >>>>>>>>>>>>>>> public ProcessContinuation process(ProcessContext c, > > > >>>>>>>>>>>> OffsetRangeTracker > > > >>>>>>>>>>>>>>> tracker) { > > > >>>>>>>>>>>>>>> for (int i = > tracker.currentRestriction().getFrom(); > > > >>>>>>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i); > > > >>>>>>>>>>>>>>> return resume(); > > > >>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> @GetInitialRestriction > > > >>>>>>>>>>>>>>> public OffsetRange getInitialRange(String element) { > > > >>>>>>>> return > > > >>>>>>>>>> new > > > >>>>>>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); } > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> @NewTracker > > > >>>>>>>>>>>>>>> public OffsetRangeTracker newTracker(OffsetRange > > > >>>>>> range) { > > > >>>>>>>>>> return > > > >>>>>>>>>>>> new > > > >>>>>>>>>>>>>>> OffsetRangeTracker(range); } > > > >>>>>>>>>>>>>>> } > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> ====== What I'm asking ====== > > > >>>>>>>>>>>>>>> So, I'd like to ask for help integrating SDF into > Spark, > > > >>>>>>> Flink > > > >>>>>>>>> and > > > >>>>>>>>>>>> Apex > > > >>>>>>>>>>>>>>> runners from people who are intimately familiar with > them > > > >>>>>> - > > > >>>>>>>>>>>>>>> specifically, I > > > >>>>>>>>>>>>>>> was hoping best-case I could nerd-snipe some of you > into > > > >>>>>>>> taking > > > >>>>>>>>>> over > > > >>>>>>>>>>>>>>> the > > > >>>>>>>>>>>>>>> integration of SDF with your favorite runner ;) > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> The proper set of people seems to be +Aljoscha Krettek > > > >>>>>>>>>>>>>>> <[email protected]> +Maximilian Michels > > > >>>>>>>>>>>>>>> <[email protected]> > > > >>>>>>>>>>>>>>> [email protected] <[email protected]> +Amit Sela > > > >>>>>>>>>>>>>>> <[email protected]> +Thomas > > > >>>>>>>>>>>>>>> Weise unless I forgot somebody. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Average-case, I was looking for runner-specific > guidance > > > >>>>>> on > > > >>>>>>>> how > > > >>>>>>>>> to > > > >>>>>>>>>>> do > > > >>>>>>>>>>>>>>> it > > > >>>>>>>>>>>>>>> myself. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> ====== If you want to help ====== > > > > > >>>>>>>>>>>>>>> If somebody decides to take this over, in my absence > (I'll > > > >>>>>>> be > > > >>>>>>>>>> mostly > > > >>>>>>>>>>>>>>> gone > > > >>>>>>>>>>>>>>> for ~the next month)., the best people to ask for > > > >>>>>>>> implementation > > > >>>>>>>>>>>>>>> advice are +Kenn > > > >>>>>>>>>>>>>>> Knowles <[email protected]> and +Daniel Mills < > > > >>>>>>> [email protected] > > > >>>>>>>>> > > > >>>>>>>>> . > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> For reference, here's how SDF is implemented in the > direct > > > >>>>>>>>> runner: > > > >>>>>>>>>>>>>>> - Direct runner overrides > > > >>>>>>>>>>>>>>> < > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b > > > >>>>>>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/ > > > >>>>>>>>> beam/runners/direct/ParDoMultiOverrideFactory.java > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it with > > > >>>>>>>>>>> SplittableParDo > > > >>>>>>>>>>>>>>> < > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> https://github.com/apache/beam/blob/master/runners/core- > > > > > >>>>>>>>> > java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> (common > > > >>>>>>>>>>>>>>> transform expansion) > > > >>>>>>>>>>>>>>> - SplittableParDo uses two runner-specific primitive > > > >>>>>>>> transforms: > > > >>>>>>>>>>>>>>> "GBKIntoKeyedWorkItems" and > "SplittableProcessElements". > > > >>>>>>>> Direct > > > >>>>>>>>>>> runner > > > >>>>>>>>>>>>>>> overrides the first one like this > > > >>>>>>>>>>>>>>> < > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > > > > >>>>>>>>> > beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java > > > >>>>>>>>>>>>>>> , > > > >>>>>>>>>>>>>>> and directly implements evaluation of the second one > like > > > >>>>>>> this > > > >>>>>>>>>>>>>>> < > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > > > > >>>>>>>>> > beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java > > > >>>>>>>>>>>>>>> , > > > >>>>>>>>>>>>>>> using runner hooks introduced in this PR > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the > core > > > >>>>>> of > > > >>>>>>>> the > > > >>>>>>>>>>> hooks > > > >>>>>>>>>>>> is > > > >>>>>>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to be > > > >>>>>>>> prepared > > > >>>>>>>>> at > > > >>>>>>>>>>>>>>> runtime > > > >>>>>>>>>>>>>>> with some hooks (state, timers, and runner access to > > > >>>>>>>>>>>>>>> RestrictionTracker) > > > > > >>>>>>>>>>>>>>> before you invoke it. I added a convenience > implementation > > > >>>>>>> of > > > >>>>>>>>> the > > > >>>>>>>>>>> hook > > > >>>>>>>>>>>>>>> mimicking behavior of UnboundedSource. > > > >>>>>>>>>>>>>>> - The relevant runner-agnostic tests are in > > > >>>>>>> SplittableDoFnTest > > > >>>>>>>>>>>>>>> < > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > > >>>>>>>>> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/ > > > >>>>>>>>> transforms/SplittableDoFnTest.java > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> . > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> That's all it takes, really - the runner has to > implement > > > >>>>>>>> these > > > >>>>>>>>>> two > > > >>>>>>>>>>>>>>> transforms. When I looked at Spark and Flink runners, > it > > > >>>>>> was > > > >>>>>>>> not > > > >>>>>>>>>>> quite > > > >>>>>>>>>>>>>>> clear to me how to implement the GBKIntoKeyedWorkItems > > > >>>>>>>>> transform, > > > >>>>>>>>>>> e.g. > > > >>>>>>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at > all - > > > >>>>>>> but > > > >>>>>>>> it > > > >>>>>>>>>>> seems > > > >>>>>>>>>>>>>>> definitely possible. > > > >>>>>>>>>>>>>>> > > > >>>>>>>>>>>>>>> Thanks! > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> -- > > > >>>>>>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> [email protected] > > > > > >>>>>>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146> > <+49%2030%2055599146> <+49%2030%2055599146> > > > >>>>>> <+49%2030%2055599146> > > > >>>>>>>> <+49%2030%2055599146> <+49%2030%2055599146> > > > >>>>>>>>>> <+49%2030%2055599146 <(205)%20559-9146>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB 158244 B > > > >>>>>>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>>> -- > > > >>>> Jean-Baptiste Onofré > > > >>>> [email protected] > > > >>>> http://blog.nanthrax.net > > > >>>> Talend - http://www.talend.com > > > >>>> > > > >>>> > > > >>> > > > >> > > > >> -- > > > >> Jean-Baptiste Onofré > > > >> [email protected] > > > >> http://blog.nanthrax.net > > > >> Talend - http://www.talend.com > > > > > > -- > > > Jean-Baptiste Onofré > > > [email protected] > > > http://blog.nanthrax.net > > > Talend - http://www.talend.com > > >
