Thanks for the offers, guys! The code is finished, though. I only need to do the last touch ups.
On Tue, Mar 28, 2017, at 09:16, JingsongLee wrote: > Hi Aljoscha, > I would like to work on the Flink runner with you. > Best,JingsongLee------------------------------------------------------------------From:Jean-Baptiste > Onofré <[email protected]>Time:2017 Mar 28 (Tue) 14:04To:dev > <[email protected]>Subject:Re: Call for help: let's add Splittable DoFn > to Spark, Flink and Apex runners > Hi Aljoscha, > > do you need some help on this ? > > Regards > JB > > On 03/28/2017 08:00 AM, Aljoscha Krettek wrote: > > Hi, > > sorry for being so slow but I’m currently traveling. > > > > The Flink code works but I think it could benefit from some refactoring > > to make the code nice and maintainable. > > > > Best, > > Aljoscha > > > > On Tue, Mar 28, 2017, at 07:40, Jean-Baptiste Onofré wrote: > >> I add myself on the Spark runner. > >> > >> Regards > >> JB > >> > >> On 03/27/2017 08:18 PM, Eugene Kirpichov wrote: > >>> Hi all, > >>> > >>> Let's continue the ~bi-weekly sync-ups about state of SDF support in > >>> Spark/Flink/Apex runners. > >>> > >>> Spark: > >>> Amit, Aviem, Ismaël - when would be a good time for you; does same time > >>> work (8am PST this Friday)? Who else would like to join? > >>> > >>> Flink: > >>> I pinged the PR, but - Aljoscha, do you think it's worth discussing > >>> anything there over a videocall? > >>> > >>> Apex: > >>> Thomas - how about same time next Monday? (9:30am PST) Who else would like > >>> to join? > >>> > >>> On Mon, Mar 20, 2017 at 9:59 AM Eugene Kirpichov <[email protected]> > >>> wrote: > >>> > >>>> Meeting notes: > >>>> Me and Thomas had a video call and we pretty much walked through the > >>>> implementation of SDF in the runner-agnostic part and in the direct > >>>>runner. > >>>> Flink and Apex are pretty similar, so likely > >>>> https://github.com/apache/beam/pull/2235 (the Flink PR) will give a very > >>>> good guideline as to how to do this in Apex. > >>>> Will talk again in ~2 weeks; and will involve +David Yan > >>>> <[email protected]> who is also on Apex and currently conveniently > >>>> works on the Google Dataflow team and, from in-person conversation, was > >>>> interested in being involved :) > >>>> > >>>> On Mon, Mar 20, 2017 at 7:34 AM Eugene Kirpichov <[email protected]> > >>>> wrote: > >>>> > >>>> Thomas - yes, 9:30 works, shall we do that? > >>>> > >>>> JB - excellent! You can start experimenting already, using direct runner! > >>>> > >>>> On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré <[email protected]> > >>>> wrote: > >>>> > >>>> Hi Eugene, > >>>> > >>>> Thanks for the meeting notes ! > >>>> > >>>> I will be in the next call and Ismaël also provided to me some updates. > >>>> > >>>> I will sync with Amit on Spark runner and start to experiment and test > >>>>SDF > >>>> on > >>>> the JMS IO. > >>>> > >>>> Thanks ! > >>>> Regards > >>>> JB > >>>> > >>>> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote: > >>>>> Meeting notes from today's call with Amit, Aviem and Ismaël: > >>>>> > >>>>> Spark has 2 types of stateful operators; a cheap one intended for > >>>> updating > >>>>> elements (works with state but not with timers) and an expensive one. > >>>> I.e. > >>>>> there's no efficient direct counterpart to Beam's keyed state model. In > >>>>> implementation of Beam State & Timers API, Spark runner will use the > >>>>> cheaper one for state and the expensive one for timers. So, for SDF, > >>>> which > >>>>> in the runner-agnostic SplittableParDo expansion needs both state and > >>>>> timers, we'll need the expensive one - but this should be fine since > >>>>>with > >>>>> SDF the bottleneck should be in the ProcessElement call itself, not in > >>>>> splitting/scheduling it. > >>>>> > >>>>> For Spark batch runner, implementing SDF might be still simpler: runner > >>>>> will just not request any checkpointing. Hard parts about SDF/batch are > >>>>> dynamic rebalancing and size estimation APIs - they will be refined this > >>>>> quarter, but it's ok to initially not have them. > >>>>> > >>>>> Spark runner might use a different expansion of SDF not involving > >>>>> KeyedWorkItem's (i.e. not overriding the GBKIntoKeyedWorkItems > >>>> transform), > >>>>> though still striving to reuse as much code as possible from the > >>>>>standard > >>>>> expansion implemented in SplittableParDo, at least ProcessFn. > >>>>> > >>>>> Testing questions: > >>>>> - Spark runner already implements termination on > >>>>> watermarks-reaching-infinity properly. > >>>>> - Q: How to test that the runner actually splits? A: The code that > >>>>>splits > >>>>> is in the runner-agnostic, so a runner would have to deliberately > >>>> sabotage > >>>>> it in order to break it - unlikely. Also, for semantics we have > >>>>> runner-agnostic ROS tests; but at some point will need performance tests > >>>>> too. > >>>>> > >>>>> Next steps: > >>>>> - Amit will look at the standard SplittableParDo expansion and > >>>>> implementation in Flink and Direct runner, will write up a doc about how > >>>> to > >>>>> do this in Spark. > >>>>> - Another videotalk in 2 weeks to check on progress/issues. > >>>>> > >>>>> Thanks all! > >>>>> > >>>>> On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <[email protected]> > >>>>> wrote: > >>>>> > >>>>>> Yes, Monday morning works! How about also 8am PST, same Hangout link - > >>>>>> does that work for you? > >>>>>> > >>>>>> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <[email protected]> > >>>>>> wrote: > >>>>>> > >>>>>> Eugene, > >>>>>> > >>>>>> I cannot make it for the call today. Would Monday morning work for you > >>>> to > >>>>>> discuss the Apex changes? > >>>>>> > >>>>>> Thanks > >>>>>> > >>>>>> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov < > >>>>>> [email protected]> wrote: > >>>>>> > >>>>>>> Hi! Please feel free to join this call, but I think we'd be mostly > >>>>>>> discussing how to do it in the Spark runner in particular; so we'll > >>>>>>> probably need another call for Apex anyway. > >>>>>>> > >>>>>>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <[email protected]> wrote: > >>>>>>> > >>>>>>>> Hi Eugene, > >>>>>>>> > >>>>>>>> This would work for me also. Please let me know if you want to keep > >>>> the > >>>>>>>> Apex related discussion separate or want me to join this call. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> Thomas > >>>>>>>> > >>>>>>>> > >>>>>>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov < > >>>>>>>> [email protected]> wrote: > >>>>>>>> > >>>>>>>>> Sure, Friday morning sounds good. How about 9am Friday PST, at > >>>>>>> videocall > >>>>>>>> by > >>>>>>>>> link > >>>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn > >>>>>>> ? > >>>>>>>>> > >>>>>>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <[email protected]> > >>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> PST mornings are better, because they are evening/nights for me. > >>>>>>> Friday > >>>>>>>>>> would work-out best for me. > >>>>>>>>>> > >>>>>>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov > >>>>>>>>>> <[email protected]> wrote: > >>>>>>>>>> > >>>>>>>>>>> Awesome!!! > >>>>>>>>>>> > >>>>>>>>>>> Amit - remind me your time zone? JB, do you want to join? > >>>>>>>>>>> I'm free this week all afternoons (say after 2pm) in Pacific > >>>>>> Time, > >>>>>>>> and > >>>>>>>>>>> mornings of Wed & Fri. We'll probably need half an hour to an > >>>>>> hour. > >>>>>>>>>>> > >>>>>>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek < > >>>>>>>> [email protected]> > >>>>>>>>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>>> I whipped up a quick version for Flink that seems to work: > >>>>>>>>>>>> https://github.com/apache/beam/pull/2235 > >>>>>>>>>>>> > >>>>>>>>>>>> There are still two failing tests, as described in the PR. > >>>>>>>>>>>> > >>>>>>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote: > >>>>>>>>>>>>> +1 for a video call. I think it should be pretty straight > >>>>>>> forward > >>>>>>>>> for > >>>>>>>>>>> the > >>>>>>>>>>>>> Spark runner after the work on read from UnboundedSource and > >>>>>>>> after > >>>>>>>>>>>>> GroupAlsoByWindow, but from my experience such a call could > >>>>>>> move > >>>>>>>> us > >>>>>>>>>>>>> forward > >>>>>>>>>>>>> fast enough. > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov < > >>>>>>>> [email protected] > >>>>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Let us continue working on this. I am back from various > >>>>>>> travels > >>>>>>>>> and > >>>>>>>>>>> am > >>>>>>>>>>>>>> eager to help. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Amit, JB - would you like to perhaps have a videocall to > >>>>>> hash > >>>>>>>>> this > >>>>>>>>>>> out > >>>>>>>>>>>> for > >>>>>>>>>>>>>> the Spark runner? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Aljoscha - are the necessary Flink changes done / or is the > >>>>>>>> need > >>>>>>>>>> for > >>>>>>>>>>>> them > >>>>>>>>>>>>>> obviated by using the (existing) runner-facing state/timer > >>>>>>>> APIs? > >>>>>>>>>>>> Should we > >>>>>>>>>>>>>> have a videocall too? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thomas - what do you think about getting this into Apex > >>>>>>> runner? > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> (I think videocalls will allow to make rapid progress, but > >>>>>>> it's > >>>>>>>>>>>> probably a > >>>>>>>>>>>>>> better idea to keep them separate since they'll involve a > >>>>>> lot > >>>>>>>> of > >>>>>>>>>>>>>> runner-specific details) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> PS - The completion of this in Dataflow streaming runner is > >>>>>>>>>> currently > >>>>>>>>>>>>>> waiting only on having a small service-side change > >>>>>>> implemented > >>>>>>>>> and > >>>>>>>>>>>> rolled > >>>>>>>>>>>>>> out for termination of streaming jobs. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles < > >>>>>>>> [email protected]> > >>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I recommend proceeding with the runner-facing state & timer > >>>>>>>> APIs; > >>>>>>>>>>> they > >>>>>>>>>>>> are > >>>>>>>>>>>>>> lower-level and more appropriate for this. All runners > >>>>>>> provide > >>>>>>>>> them > >>>>>>>>>>> or > >>>>>>>>>>>> use > >>>>>>>>>>>>>> runners/core implementations, as they are needed for > >>>>>>>> triggering. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov < > >>>>>>>>>>>> [email protected]> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks Aljoscha! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Minor note: I'm not familiar with what level of support for > >>>>>>>>> timers > >>>>>>>>>>>> Flink > >>>>>>>>>>>>>> currently has - however SDF in Direct and Dataflow runner > >>>>>>>>> currently > >>>>>>>>>>>> does > >>>>>>>>>>>>>> not use the user-facing state/timer APIs - rather, it uses > >>>>>>> the > >>>>>>>>>>>>>> runner-facing APIs (StateInternals and TimerInternals) - > >>>>>>>> perhaps > >>>>>>>>>>> Flink > >>>>>>>>>>>>>> already implements these. We may want to change this, but > >>>>>> for > >>>>>>>> now > >>>>>>>>>>> it's > >>>>>>>>>>>> good > >>>>>>>>>>>>>> enough (besides, SDF uses watermark holds, which are not > >>>>>>>>> supported > >>>>>>>>>> by > >>>>>>>>>>>> the > >>>>>>>>>>>>>> user-facing state API yet). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek < > >>>>>>>>>>>>>> [email protected]> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the motivation, Eugene! :-) > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I've wanted to do this for a while now but was waiting for > >>>>>>> the > >>>>>>>>>> Flink > >>>>>>>>>>>> 1.2 > >>>>>>>>>>>>>> release (which happened this week)! There's some > >>>>>> prerequisite > >>>>>>>>> work > >>>>>>>>>> to > >>>>>>>>>>>> be > >>>>>>>>>>>>>> done on the Flink runner: we'll move to the new timer > >>>>>>>> interfaces > >>>>>>>>>>>> introduced > >>>>>>>>>>>>>> in Flink 1.2 and implement support for both the user facing > >>>>>>>> state > >>>>>>>>>> and > >>>>>>>>>>>> timer > >>>>>>>>>>>>>> APIs. This should make implementation of SDF easier. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov < > >>>>>>>>>>> [email protected] > >>>>>>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks! Looking forward to this work. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré < > >>>>>>>>>> [email protected] > >>>>>>>>>>>> > >>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks for the update Eugene. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> I will work on the spark runner with Amit. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Regards > >>>>>>>>>>>>>> JB > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov > >>>>>>>>>>>>>> <[email protected]> wrote: > >>>>>>>>>>>>>>> Hello, > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I'm almost done adding support for Splittable DoFn > >>>>>>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow > >>>>>> streaming > >>>>>>>>>> runner*, > >>>>>>>>>>>> and > >>>>>>>>>>>>>>> very excited about that. There's only 1 PR > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1898> remaining, > >>>>>> plus > >>>>>>>>>> enabling > >>>>>>>>>>>>>>> some > >>>>>>>>>>>>>>> tests. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> * (batch runner is much harder because it's not yet quite > >>>>>>>> clear > >>>>>>>>> to > >>>>>>>>>>> me > >>>>>>>>>>>>>>> how > >>>>>>>>>>>>>>> to properly implement liquid sharding > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> https://cloud.google.com/blog/big-data/2016/05/no-shard- > >>>>>>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> with > >>>>>>>>>>>>>>> SDF - and the current API is not ready for that yet) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> After implementing all the runner-agnostic parts of > >>>>>>> Splittable > >>>>>>>>>>> DoFn, I > >>>>>>>>>>>>>>> found them quite easy to integrate into Dataflow streaming > >>>>>>>>> runner, > >>>>>>>>>>> and > >>>>>>>>>>>>>>> I > >>>>>>>>>>>>>>> think this means it should be easy to integrate into other > >>>>>>>>> runners > >>>>>>>>>>>> too. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> ====== Why it'd be cool ====== > >>>>>>>>>>>>>>> The general benefits of SDF are well-described in the > >>>>>> design > >>>>>>>> doc > >>>>>>>>>>>>>>> (linked > >>>>>>>>>>>>>>> above). > >>>>>>>>>>>>>>> As for right now - if we integrated SDF with all runners, > >>>>>>> it'd > >>>>>>>>>>> already > >>>>>>>>>>>>>>> enable us to start greatly simplifying the code of > >>>>>> existing > >>>>>>>>>>> streaming > >>>>>>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and writing > >>>>>>> new > >>>>>>>>>>>>>>> connectors > >>>>>>>>>>>>>>> (e.g. a really nice one to implement would be "directory > >>>>>>>>> watcher", > >>>>>>>>>>>> that > >>>>>>>>>>>>>>> continuously returns new files in a directory). > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> As a teaser, here's the complete implementation of an > >>>>>>>> "unbounded > >>>>>>>>>>>>>>> counter" I > >>>>>>>>>>>>>>> used for my test of Dataflow runner integration: > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> class CountFn extends DoFn<String, String> { > >>>>>>>>>>>>>>> @ProcessElement > >>>>>>>>>>>>>>> public ProcessContinuation process(ProcessContext c, > >>>>>>>>>>>> OffsetRangeTracker > >>>>>>>>>>>>>>> tracker) { > >>>>>>>>>>>>>>> for (int i = tracker.currentRestriction().getFrom(); > >>>>>>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i); > >>>>>>>>>>>>>>> return resume(); > >>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> @GetInitialRestriction > >>>>>>>>>>>>>>> public OffsetRange getInitialRange(String element) { > >>>>>>>> return > >>>>>>>>>> new > >>>>>>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); } > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> @NewTracker > >>>>>>>>>>>>>>> public OffsetRangeTracker newTracker(OffsetRange > >>>>>> range) { > >>>>>>>>>> return > >>>>>>>>>>>> new > >>>>>>>>>>>>>>> OffsetRangeTracker(range); } > >>>>>>>>>>>>>>> } > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> ====== What I'm asking ====== > >>>>>>>>>>>>>>> So, I'd like to ask for help integrating SDF into Spark, > >>>>>>> Flink > >>>>>>>>> and > >>>>>>>>>>>> Apex > >>>>>>>>>>>>>>> runners from people who are intimately familiar with them > >>>>>> - > >>>>>>>>>>>>>>> specifically, I > >>>>>>>>>>>>>>> was hoping best-case I could nerd-snipe some of you into > >>>>>>>> taking > >>>>>>>>>> over > >>>>>>>>>>>>>>> the > >>>>>>>>>>>>>>> integration of SDF with your favorite runner ;) > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> The proper set of people seems to be +Aljoscha Krettek > >>>>>>>>>>>>>>> <[email protected]> +Maximilian Michels > >>>>>>>>>>>>>>> <[email protected]> > >>>>>>>>>>>>>>> [email protected] <[email protected]> +Amit Sela > >>>>>>>>>>>>>>> <[email protected]> +Thomas > >>>>>>>>>>>>>>> Weise unless I forgot somebody. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Average-case, I was looking for runner-specific guidance > >>>>>> on > >>>>>>>> how > >>>>>>>>> to > >>>>>>>>>>> do > >>>>>>>>>>>>>>> it > >>>>>>>>>>>>>>> myself. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> ====== If you want to help ====== > >>>>>>>>>>>>>>> If somebody decides to take this over, in my absence (I'll > >>>>>>> be > >>>>>>>>>> mostly > >>>>>>>>>>>>>>> gone > >>>>>>>>>>>>>>> for ~the next month)., the best people to ask for > >>>>>>>> implementation > >>>>>>>>>>>>>>> advice are +Kenn > >>>>>>>>>>>>>>> Knowles <[email protected]> and +Daniel Mills < > >>>>>>> [email protected] > >>>>>>>>> > >>>>>>>>> . > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> For reference, here's how SDF is implemented in the direct > >>>>>>>>> runner: > >>>>>>>>>>>>>>> - Direct runner overrides > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b > >>>>>>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/ > >>>>>>>>> beam/runners/direct/ParDoMultiOverrideFactory.java > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it with > >>>>>>>>>>> SplittableParDo > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> https://github.com/apache/beam/blob/master/runners/core- > >>>>>>>>> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> (common > >>>>>>>>>>>>>>> transform expansion) > >>>>>>>>>>>>>>> - SplittableParDo uses two runner-specific primitive > >>>>>>>> transforms: > >>>>>>>>>>>>>>> "GBKIntoKeyedWorkItems" and "SplittableProcessElements". > >>>>>>>> Direct > >>>>>>>>>>> runner > >>>>>>>>>>>>>>> overrides the first one like this > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > >>>>>>>>> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java > >>>>>>>>>>>>>>> , > >>>>>>>>>>>>>>> and directly implements evaluation of the second one like > >>>>>>> this > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > >>>>>>>>> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java > >>>>>>>>>>>>>>> , > >>>>>>>>>>>>>>> using runner hooks introduced in this PR > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the core > >>>>>> of > >>>>>>>> the > >>>>>>>>>>> hooks > >>>>>>>>>>>> is > >>>>>>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to be > >>>>>>>> prepared > >>>>>>>>> at > >>>>>>>>>>>>>>> runtime > >>>>>>>>>>>>>>> with some hooks (state, timers, and runner access to > >>>>>>>>>>>>>>> RestrictionTracker) > >>>>>>>>>>>>>>> before you invoke it. I added a convenience implementation > >>>>>>> of > >>>>>>>>> the > >>>>>>>>>>> hook > >>>>>>>>>>>>>>> mimicking behavior of UnboundedSource. > >>>>>>>>>>>>>>> - The relevant runner-agnostic tests are in > >>>>>>> SplittableDoFnTest > >>>>>>>>>>>>>>> < > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > >>>>>>>>> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/ > >>>>>>>>> transforms/SplittableDoFnTest.java > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> . > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> That's all it takes, really - the runner has to implement > >>>>>>>> these > >>>>>>>>>> two > >>>>>>>>>>>>>>> transforms. When I looked at Spark and Flink runners, it > >>>>>> was > >>>>>>>> not > >>>>>>>>>>> quite > >>>>>>>>>>>>>>> clear to me how to implement the GBKIntoKeyedWorkItems > >>>>>>>>> transform, > >>>>>>>>>>> e.g. > >>>>>>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at all - > >>>>>>> but > >>>>>>>> it > >>>>>>>>>>> seems > >>>>>>>>>>>>>>> definitely possible. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks! > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -- > >>>>>>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> [email protected] > >>>>>>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146> <+49%2030%2055599146> > >>>>>> <+49%2030%2055599146> > >>>>>>>> <+49%2030%2055599146> <+49%2030%2055599146> > >>>>>>>>>> <+49%2030%2055599146> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB 158244 B > >>>>>>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> -- > >>>> Jean-Baptiste Onofré > >>>> [email protected] > >>>> http://blog.nanthrax.net > >>>> Talend - http://www.talend.com > >>>> > >>>> > >>> > >> > >> -- > >> Jean-Baptiste Onofré > >> [email protected] > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com
