Hi all, Splittable DoFn is now available in Dataflow streaming runner, as of https://github.com/apache/beam/pull/1898 !
Meanwhile, Flink support got disabled due to some churn as part of First Stable Release, but it should be not hard to fix - tracked in https://issues.apache.org/jira/browse/BEAM-2140 +Aljoscha Krettek <[email protected]> Thomas - any news on Apex? On Sat, Apr 8, 2017 at 1:28 PM Thomas Weise <[email protected]> wrote: > Nice work Aljoscha! > > Update WRT ApexRunner: We merged some prep work in the ParDoOperator to > weed out remnants of OldDoFn. I have almost all the changes ready to add > the support for Splittable DoFn (for most part those follow the Flink > runner changes). The final piece missing to support the feature (based on > observation from the test failures) is the timer internals. > > Thanks, > Thomas > > > On Sat, Apr 1, 2017 at 1:17 AM, Eugene Kirpichov < > [email protected]> wrote: > > > Hey all, > > > > The Flink PR has been merged, and thus - Flink becomes the first > > distributed runner to support Splittable DoFn!!! > > Thank you, Aljoscha! > > > > Looking forward to Spark and Apex, and continuing work on Dataflow. > > I'll also send proposals about a couple of new ideas related to SDF next > > week. > > > > On Thu, Mar 30, 2017 at 9:08 AM Amit Sela <[email protected]> wrote: > > > > > I will not be able to make it this weekend, too busy. Let's chat at the > > > beginning of next week and see what's on my plate. > > > > > > On Tue, Mar 28, 2017 at 5:44 PM Aljoscha Krettek <[email protected]> > > > wrote: > > > > > > > Thanks for the offers, guys! The code is finished, though. I only > need > > > > to do the last touch ups. > > > > > > > > On Tue, Mar 28, 2017, at 09:16, JingsongLee wrote: > > > > > Hi Aljoscha, > > > > > I would like to work on the Flink runner with you. > > > > > > > > > > > > Best,JingsongLee-------------------------------------------- > > ----------------------From:Jean-Baptiste > > > > > Onofré <[email protected]>Time:2017 Mar 28 (Tue) 14:04To:dev > > > > > <[email protected]>Subject:Re: Call for help: let's add > Splittable > > > > DoFn > > > > > to Spark, Flink and Apex runners > > > > > Hi Aljoscha, > > > > > > > > > > do you need some help on this ? > > > > > > > > > > Regards > > > > > JB > > > > > > > > > > On 03/28/2017 08:00 AM, Aljoscha Krettek wrote: > > > > > > Hi, > > > > > > sorry for being so slow but I’m currently traveling. > > > > > > > > > > > > The Flink code works but I think it could benefit from some > > > refactoring > > > > > > to make the code nice and maintainable. > > > > > > > > > > > > Best, > > > > > > Aljoscha > > > > > > > > > > > > On Tue, Mar 28, 2017, at 07:40, Jean-Baptiste Onofré wrote: > > > > > >> I add myself on the Spark runner. > > > > > >> > > > > > >> Regards > > > > > >> JB > > > > > >> > > > > > >> On 03/27/2017 08:18 PM, Eugene Kirpichov wrote: > > > > > >>> Hi all, > > > > > >>> > > > > > >>> Let's continue the ~bi-weekly sync-ups about state of SDF > support > > > in > > > > > >>> Spark/Flink/Apex runners. > > > > > >>> > > > > > >>> Spark: > > > > > > > > > >>> Amit, Aviem, Ismaël - when would be a good time for you; does > same > > > time > > > > > >>> work (8am PST this Friday)? Who else would like to join? > > > > > >>> > > > > > >>> Flink: > > > > > >>> I pinged the PR, but - Aljoscha, do you think it's worth > > discussing > > > > > >>> anything there over a videocall? > > > > > >>> > > > > > >>> Apex: > > > > > > > > > >>> Thomas - how about same time next Monday? (9:30am PST) Who else > > > would like > > > > > >>> to join? > > > > > >>> > > > > > >>> On Mon, Mar 20, 2017 at 9:59 AM Eugene Kirpichov < > > > > [email protected]> > > > > > >>> wrote: > > > > > >>> > > > > > >>>> Meeting notes: > > > > > >>>> Me and Thomas had a video call and we pretty much walked > through > > > the > > > > > > > > > >>>> implementation of SDF in the runner-agnostic part and in the > > direct > > > runner. > > > > > >>>> Flink and Apex are pretty similar, so likely > > > > > >>>> https://github.com/apache/beam/pull/2235 > > > > (the Flink PR) will give a very > > > > > >>>> good guideline as to how to do this in Apex. > > > > > >>>> Will talk again in ~2 weeks; and will involve +David Yan > > > > > >>>> <[email protected] > > > > > who is also on Apex and currently conveniently > > > > > > > > > >>>> works on the Google Dataflow team and, from in-person > > conversation, > > > was > > > > > >>>> interested in being involved :) > > > > > >>>> > > > > > >>>> On Mon, Mar 20, 2017 at 7:34 AM Eugene Kirpichov < > > > > [email protected]> > > > > > >>>> wrote: > > > > > >>>> > > > > > >>>> Thomas - yes, 9:30 works, shall we do that? > > > > > >>>> > > > > > > > > > >>>> JB - excellent! You can start experimenting already, using > direct > > > runner! > > > > > >>>> > > > > > >>>> On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré < > > > [email protected] > > > > > > > > > > >>>> wrote: > > > > > >>>> > > > > > >>>> Hi Eugene, > > > > > >>>> > > > > > >>>> Thanks for the meeting notes ! > > > > > >>>> > > > > > > > > > >>>> I will be in the next call and Ismaël also provided to me some > > > updates. > > > > > >>>> > > > > > > > > > >>>> I will sync with Amit on Spark runner and start to experiment > and > > > test SDF > > > > > >>>> on > > > > > >>>> the JMS IO. > > > > > >>>> > > > > > >>>> Thanks ! > > > > > >>>> Regards > > > > > >>>> JB > > > > > >>>> > > > > > >>>> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote: > > > > > >>>>> Meeting notes from today's call with Amit, Aviem and Ismaël: > > > > > >>>>> > > > > > >>>>> Spark has 2 types of stateful operators; a cheap one intended > > for > > > > > >>>> updating > > > > > > > > > >>>>> elements (works with state but not with timers) and an > expensive > > > one. > > > > > >>>> I.e. > > > > > > > > > >>>>> there's no efficient direct counterpart to Beam's keyed state > > > model. In > > > > > > > > > >>>>> implementation of Beam State & Timers API, Spark runner will > use > > > the > > > > > > > > > >>>>> cheaper one for state and the expensive one for timers. So, for > > > SDF, > > > > > >>>> which > > > > > > > > > >>>>> in the runner-agnostic SplittableParDo expansion needs both > state > > > and > > > > > > > > > >>>>> timers, we'll need the expensive one - but this should be fine > > > since with > > > > > > > > > >>>>> SDF the bottleneck should be in the ProcessElement call itself, > > > not in > > > > > >>>>> splitting/scheduling it. > > > > > >>>>> > > > > > > > > > >>>>> For Spark batch runner, implementing SDF might be still > simpler: > > > runner > > > > > > > > > >>>>> will just not request any checkpointing. Hard parts about > > > SDF/batch are > > > > > > > > > >>>>> dynamic rebalancing and size estimation APIs - they will be > > > refined this > > > > > >>>>> quarter, but it's ok to initially not have them. > > > > > >>>>> > > > > > >>>>> Spark runner might use a different expansion of SDF not > > involving > > > > > >>>>> KeyedWorkItem's (i.e. not overriding the > GBKIntoKeyedWorkItems > > > > > >>>> transform), > > > > > > > > > >>>>> though still striving to reuse as much code as possible from > the > > > standard > > > > > >>>>> expansion implemented in SplittableParDo, at least ProcessFn. > > > > > >>>>> > > > > > >>>>> Testing questions: > > > > > >>>>> - Spark runner already implements termination on > > > > > >>>>> watermarks-reaching-infinity properly. > > > > > > > > > >>>>> - Q: How to test that the runner actually splits? A: The code > > that > > > splits > > > > > >>>>> is in the runner-agnostic, so a runner would have to > > deliberately > > > > > >>>> sabotage > > > > > >>>>> it in order to break it - unlikely. Also, for semantics we > have > > > > > > > > > >>>>> runner-agnostic ROS tests; but at some point will need > > performance > > > tests > > > > > >>>>> too. > > > > > >>>>> > > > > > >>>>> Next steps: > > > > > >>>>> - Amit will look at the standard SplittableParDo expansion > and > > > > > > > > > >>>>> implementation in Flink and Direct runner, will write up a doc > > > about how > > > > > >>>> to > > > > > >>>>> do this in Spark. > > > > > >>>>> - Another videotalk in 2 weeks to check on progress/issues. > > > > > >>>>> > > > > > >>>>> Thanks all! > > > > > >>>>> > > > > > >>>>> On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov < > > > > [email protected]> > > > > > >>>>> wrote: > > > > > >>>>> > > > > > > > > > >>>>>> Yes, Monday morning works! How about also 8am PST, same > Hangout > > > link - > > > > > >>>>>> does that work for you? > > > > > >>>>>> > > > > > >>>>>> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise < > > > > [email protected]> > > > > > >>>>>> wrote: > > > > > >>>>>> > > > > > >>>>>> Eugene, > > > > > >>>>>> > > > > > > > > > >>>>>> I cannot make it for the call today. Would Monday morning work > > > for you > > > > > >>>> to > > > > > >>>>>> discuss the Apex changes? > > > > > >>>>>> > > > > > >>>>>> Thanks > > > > > >>>>>> > > > > > >>>>>> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov < > > > > > >>>>>> [email protected]> wrote: > > > > > >>>>>> > > > > > > > > > >>>>>>> Hi! Please feel free to join this call, but I think we'd be > > > mostly > > > > > > > > > >>>>>>> discussing how to do it in the Spark runner in particular; so > > > we'll > > > > > >>>>>>> probably need another call for Apex anyway. > > > > > >>>>>>> > > > > > >>>>>>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise < > [email protected] > > > > > wrote: > > > > > >>>>>>> > > > > > >>>>>>>> Hi Eugene, > > > > > >>>>>>>> > > > > > > > > > >>>>>>>> This would work for me also. Please let me know if you want > to > > > keep > > > > > >>>> the > > > > > >>>>>>>> Apex related discussion separate or want me to join this > > call. > > > > > >>>>>>>> > > > > > >>>>>>>> Thanks, > > > > > >>>>>>>> Thomas > > > > > >>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov < > > > > > >>>>>>>> [email protected]> wrote: > > > > > >>>>>>>> > > > > > >>>>>>>>> Sure, Friday morning sounds good. How about 9am Friday > PST, > > > at > > > > > >>>>>>> videocall > > > > > >>>>>>>> by > > > > > >>>>>>>>> link > > > > > >>>>>> > > > https://hangouts.google.com/hangouts/_/google.com/splittabledofn > > > > > >>>>>>> ? > > > > > >>>>>>>>> > > > > > >>>>>>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela < > > > > [email protected]> > > > > > >>>>>>> wrote: > > > > > >>>>>>>>> > > > > > > > > > >>>>>>>>>> PST mornings are better, because they are evening/nights > for > > > me. > > > > > >>>>>>> Friday > > > > > >>>>>>>>>> would work-out best for me. > > > > > >>>>>>>>>> > > > > > >>>>>>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov > > > > > >>>>>>>>>> <[email protected]> wrote: > > > > > >>>>>>>>>> > > > > > >>>>>>>>>>> Awesome!!! > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> Amit - remind me your time zone? JB, do you want to > join? > > > > > >>>>>>>>>>> I'm free this week all afternoons (say after 2pm) in > > > Pacific > > > > > >>>>>> Time, > > > > > >>>>>>>> and > > > > > >>>>>>>>>>> mornings of Wed & Fri. We'll probably need half an hour > > to > > > an > > > > > >>>>>> hour. > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek < > > > > > >>>>>>>> [email protected]> > > > > > >>>>>>>>>>> wrote: > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>>>> I whipped up a quick version for Flink that seems to > > work: > > > > > >>>>>>>>>>>> https://github.com/apache/beam/pull/2235 > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> There are still two failing tests, as described in the > > PR. > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote: > > > > > >>>>>>>>>>>>> +1 for a video call. I think it should be pretty > > straight > > > > > >>>>>>> forward > > > > > >>>>>>>>> for > > > > > >>>>>>>>>>> the > > > > > > > > > >>>>>>>>>>>>> Spark runner after the work on read from > UnboundedSource > > > and > > > > > >>>>>>>> after > > > > > >>>>>>>>>>>>> GroupAlsoByWindow, but from my experience such a call > > > could > > > > > >>>>>>> move > > > > > >>>>>>>> us > > > > > >>>>>>>>>>>>> forward > > > > > >>>>>>>>>>>>> fast enough. > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov < > > > > > >>>>>>>> [email protected] > > > > > >>>>>>>>>> > > > > > >>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Hi all, > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Let us continue working on this. I am back from > > various > > > > > >>>>>>> travels > > > > > >>>>>>>>> and > > > > > >>>>>>>>>>> am > > > > > >>>>>>>>>>>>>> eager to help. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Amit, JB - would you like to perhaps have a > videocall > > to > > > > > >>>>>> hash > > > > > >>>>>>>>> this > > > > > >>>>>>>>>>> out > > > > > >>>>>>>>>>>> for > > > > > >>>>>>>>>>>>>> the Spark runner? > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> Aljoscha - are the necessary Flink changes done / or > is > > > the > > > > > >>>>>>>> need > > > > > >>>>>>>>>> for > > > > > >>>>>>>>>>>> them > > > > > >>>>>>>>>>>>>> obviated by using the (existing) runner-facing > > > state/timer > > > > > >>>>>>>> APIs? > > > > > >>>>>>>>>>>> Should we > > > > > >>>>>>>>>>>>>> have a videocall too? > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Thomas - what do you think about getting this into > > Apex > > > > > >>>>>>> runner? > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> (I think videocalls will allow to make rapid > progress, > > > but > > > > > >>>>>>> it's > > > > > >>>>>>>>>>>> probably a > > > > > >>>>>>>>>>>>>> better idea to keep them separate since they'll > > involve > > > a > > > > > >>>>>> lot > > > > > >>>>>>>> of > > > > > >>>>>>>>>>>>>> runner-specific details) > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> PS - The completion of this in Dataflow streaming > runner > > > is > > > > > >>>>>>>>>> currently > > > > > >>>>>>>>>>>>>> waiting only on having a small service-side change > > > > > >>>>>>> implemented > > > > > >>>>>>>>> and > > > > > >>>>>>>>>>>> rolled > > > > > >>>>>>>>>>>>>> out for termination of streaming jobs. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles < > > > > > >>>>>>>> [email protected]> > > > > > >>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> I recommend proceeding with the runner-facing state & > > > timer > > > > > >>>>>>>> APIs; > > > > > >>>>>>>>>>> they > > > > > >>>>>>>>>>>> are > > > > > >>>>>>>>>>>>>> lower-level and more appropriate for this. All > runners > > > > > >>>>>>> provide > > > > > >>>>>>>>> them > > > > > >>>>>>>>>>> or > > > > > >>>>>>>>>>>> use > > > > > >>>>>>>>>>>>>> runners/core implementations, as they are needed for > > > > > >>>>>>>> triggering. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov < > > > > > >>>>>>>>>>>> [email protected]> > > > > > >>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Thanks Aljoscha! > > > > > >>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>> Minor note: I'm not familiar with what level of > support > > > for > > > > > >>>>>>>>> timers > > > > > >>>>>>>>>>>> Flink > > > > > >>>>>>>>>>>>>> currently has - however SDF in Direct and Dataflow > > > runner > > > > > >>>>>>>>> currently > > > > > >>>>>>>>>>>> does > > > > > >>>>>>>>>>>>>> not use the user-facing state/timer APIs - rather, > it > > > uses > > > > > >>>>>>> the > > > > > >>>>>>>>>>>>>> runner-facing APIs (StateInternals and > > TimerInternals) - > > > > > >>>>>>>> perhaps > > > > > >>>>>>>>>>> Flink > > > > > >>>>>>>>>>>>>> already implements these. We may want to change > this, > > > but > > > > > >>>>>> for > > > > > >>>>>>>> now > > > > > >>>>>>>>>>> it's > > > > > >>>>>>>>>>>> good > > > > > >>>>>>>>>>>>>> enough (besides, SDF uses watermark holds, which are > > not > > > > > >>>>>>>>> supported > > > > > >>>>>>>>>> by > > > > > >>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>> user-facing state API yet). > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek < > > > > > >>>>>>>>>>>>>> [email protected]> wrote: > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Thanks for the motivation, Eugene! :-) > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> I've wanted to do this for a while now but was > waiting > > > for > > > > > >>>>>>> the > > > > > >>>>>>>>>> Flink > > > > > >>>>>>>>>>>> 1.2 > > > > > >>>>>>>>>>>>>> release (which happened this week)! There's some > > > > > >>>>>> prerequisite > > > > > >>>>>>>>> work > > > > > >>>>>>>>>> to > > > > > >>>>>>>>>>>> be > > > > > >>>>>>>>>>>>>> done on the Flink runner: we'll move to the new > timer > > > > > >>>>>>>> interfaces > > > > > >>>>>>>>>>>> introduced > > > > > > > > > >>>>>>>>>>>>>> in Flink 1.2 and implement support for both the user > > > facing > > > > > >>>>>>>> state > > > > > >>>>>>>>>> and > > > > > >>>>>>>>>>>> timer > > > > > >>>>>>>>>>>>>> APIs. This should make implementation of SDF easier. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov < > > > > > >>>>>>>>>>> [email protected] > > > > > >>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Thanks! Looking forward to this work. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré > < > > > > > >>>>>>>>>> [email protected] > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> wrote: > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Thanks for the update Eugene. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> I will work on the spark runner with Amit. > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Regards > > > > > >>>>>>>>>>>>>> JB > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov > > > > > >>>>>>>>>>>>>> <[email protected]> wrote: > > > > > >>>>>>>>>>>>>>> Hello, > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> I'm almost done adding support for Splittable DoFn > > > > > >>>>>>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow > > > > > >>>>>> streaming > > > > > >>>>>>>>>> runner*, > > > > > >>>>>>>>>>>> and > > > > > >>>>>>>>>>>>>>> very excited about that. There's only 1 PR > > > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1898> > > remaining, > > > > > >>>>>> plus > > > > > >>>>>>>>>> enabling > > > > > >>>>>>>>>>>>>>> some > > > > > >>>>>>>>>>>>>>> tests. > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> * (batch runner is much harder because it's not yet > > > quite > > > > > >>>>>>>> clear > > > > > >>>>>>>>> to > > > > > >>>>>>>>>>> me > > > > > >>>>>>>>>>>>>>> how > > > > > >>>>>>>>>>>>>>> to properly implement liquid sharding > > > > > >>>>>>>>>>>>>>> < > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>> > https://cloud.google.com/blog/big-data/2016/05/no-shard- > > > > > >>>>>>>>> left-behind-dynamic-work-rebalancing-in-google-cloud- > > dataflow > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> with > > > > > >>>>>>>>>>>>>>> SDF - and the current API is not ready for that > yet) > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> After implementing all the runner-agnostic parts of > > > > > >>>>>>> Splittable > > > > > >>>>>>>>>>> DoFn, I > > > > > > > > > >>>>>>>>>>>>>>> found them quite easy to integrate into Dataflow > > > streaming > > > > > >>>>>>>>> runner, > > > > > >>>>>>>>>>> and > > > > > >>>>>>>>>>>>>>> I > > > > > > > > > >>>>>>>>>>>>>>> think this means it should be easy to integrate into > > > other > > > > > >>>>>>>>> runners > > > > > >>>>>>>>>>>> too. > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> ====== Why it'd be cool ====== > > > > > >>>>>>>>>>>>>>> The general benefits of SDF are well-described in > the > > > > > >>>>>> design > > > > > >>>>>>>> doc > > > > > >>>>>>>>>>>>>>> (linked > > > > > >>>>>>>>>>>>>>> above). > > > > > >>>>>>>>>>>>>>> As for right now - if we integrated SDF with all > > > runners, > > > > > >>>>>>> it'd > > > > > >>>>>>>>>>> already > > > > > >>>>>>>>>>>>>>> enable us to start greatly simplifying the code of > > > > > >>>>>> existing > > > > > >>>>>>>>>>> streaming > > > > > > > > > >>>>>>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and > > > writing > > > > > >>>>>>> new > > > > > >>>>>>>>>>>>>>> connectors > > > > > >>>>>>>>>>>>>>> (e.g. a really nice one to implement would be > > > "directory > > > > > >>>>>>>>> watcher", > > > > > >>>>>>>>>>>> that > > > > > >>>>>>>>>>>>>>> continuously returns new files in a directory). > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> As a teaser, here's the complete implementation of > an > > > > > >>>>>>>> "unbounded > > > > > >>>>>>>>>>>>>>> counter" I > > > > > >>>>>>>>>>>>>>> used for my test of Dataflow runner integration: > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> class CountFn extends DoFn<String, String> { > > > > > >>>>>>>>>>>>>>> @ProcessElement > > > > > >>>>>>>>>>>>>>> public ProcessContinuation process(ProcessContext > c, > > > > > >>>>>>>>>>>> OffsetRangeTracker > > > > > >>>>>>>>>>>>>>> tracker) { > > > > > >>>>>>>>>>>>>>> for (int i = > > > tracker.currentRestriction().getFrom(); > > > > > >>>>>>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i); > > > > > >>>>>>>>>>>>>>> return resume(); > > > > > >>>>>>>>>>>>>>> } > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> @GetInitialRestriction > > > > > >>>>>>>>>>>>>>> public OffsetRange getInitialRange(String > > element) { > > > > > >>>>>>>> return > > > > > >>>>>>>>>> new > > > > > >>>>>>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); } > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> @NewTracker > > > > > >>>>>>>>>>>>>>> public OffsetRangeTracker newTracker(OffsetRange > > > > > >>>>>> range) { > > > > > >>>>>>>>>> return > > > > > >>>>>>>>>>>> new > > > > > >>>>>>>>>>>>>>> OffsetRangeTracker(range); } > > > > > >>>>>>>>>>>>>>> } > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> ====== What I'm asking ====== > > > > > >>>>>>>>>>>>>>> So, I'd like to ask for help integrating SDF into > > > Spark, > > > > > >>>>>>> Flink > > > > > >>>>>>>>> and > > > > > >>>>>>>>>>>> Apex > > > > > >>>>>>>>>>>>>>> runners from people who are intimately familiar > with > > > them > > > > > >>>>>> - > > > > > >>>>>>>>>>>>>>> specifically, I > > > > > >>>>>>>>>>>>>>> was hoping best-case I could nerd-snipe some of you > > > into > > > > > >>>>>>>> taking > > > > > >>>>>>>>>> over > > > > > >>>>>>>>>>>>>>> the > > > > > >>>>>>>>>>>>>>> integration of SDF with your favorite runner ;) > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> The proper set of people seems to be +Aljoscha > > Krettek > > > > > >>>>>>>>>>>>>>> <[email protected]> +Maximilian Michels > > > > > >>>>>>>>>>>>>>> <[email protected]> > > > > > >>>>>>>>>>>>>>> [email protected] <[email protected]> +Amit Sela > > > > > >>>>>>>>>>>>>>> <[email protected]> +Thomas > > > > > >>>>>>>>>>>>>>> Weise unless I forgot somebody. > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Average-case, I was looking for runner-specific > > > guidance > > > > > >>>>>> on > > > > > >>>>>>>> how > > > > > >>>>>>>>> to > > > > > >>>>>>>>>>> do > > > > > >>>>>>>>>>>>>>> it > > > > > >>>>>>>>>>>>>>> myself. > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> ====== If you want to help ====== > > > > > > > > > >>>>>>>>>>>>>>> If somebody decides to take this over, in my absence > > > (I'll > > > > > >>>>>>> be > > > > > >>>>>>>>>> mostly > > > > > >>>>>>>>>>>>>>> gone > > > > > >>>>>>>>>>>>>>> for ~the next month)., the best people to ask for > > > > > >>>>>>>> implementation > > > > > >>>>>>>>>>>>>>> advice are +Kenn > > > > > >>>>>>>>>>>>>>> Knowles <[email protected]> and +Daniel Mills < > > > > > >>>>>>> [email protected] > > > > > >>>>>>>>> > > > > > >>>>>>>>> . > > > > > >>>>>>>>>>>>>>> > > > > > > > > > >>>>>>>>>>>>>>> For reference, here's how SDF is implemented in the > > > direct > > > > > >>>>>>>>> runner: > > > > > >>>>>>>>>>>>>>> - Direct runner overrides > > > > > >>>>>>>>>>>>>>> < > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>> > > > > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b > > > > > >>>>>>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/ > > > > > >>>>>>>>> beam/runners/direct/ParDoMultiOverrideFactory.java > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it > with > > > > > >>>>>>>>>>> SplittableParDo > > > > > >>>>>>>>>>>>>>> < > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>> > https://github.com/apache/beam/blob/master/runners/core- > > > > > > > > > >>>>>>>>> > > > java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> (common > > > > > >>>>>>>>>>>>>>> transform expansion) > > > > > >>>>>>>>>>>>>>> - SplittableParDo uses two runner-specific > primitive > > > > > >>>>>>>> transforms: > > > > > >>>>>>>>>>>>>>> "GBKIntoKeyedWorkItems" and > > > "SplittableProcessElements". > > > > > >>>>>>>> Direct > > > > > >>>>>>>>>>> runner > > > > > >>>>>>>>>>>>>>> overrides the first one like this > > > > > >>>>>>>>>>>>>>> < > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>> > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > > > > > > > > >>>>>>>>> > > > beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java > > > > > >>>>>>>>>>>>>>> , > > > > > >>>>>>>>>>>>>>> and directly implements evaluation of the second > one > > > like > > > > > >>>>>>> this > > > > > >>>>>>>>>>>>>>> < > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>> > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/ > > > > > > > > > >>>>>>>>> > > > beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java > > > > > >>>>>>>>>>>>>>> , > > > > > >>>>>>>>>>>>>>> using runner hooks introduced in this PR > > > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the > > > core > > > > > >>>>>> of > > > > > >>>>>>>> the > > > > > >>>>>>>>>>> hooks > > > > > >>>>>>>>>>>> is > > > > > >>>>>>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to > > be > > > > > >>>>>>>> prepared > > > > > >>>>>>>>> at > > > > > >>>>>>>>>>>>>>> runtime > > > > > >>>>>>>>>>>>>>> with some hooks (state, timers, and runner access > to > > > > > >>>>>>>>>>>>>>> RestrictionTracker) > > > > > > > > > >>>>>>>>>>>>>>> before you invoke it. I added a convenience > > > implementation > > > > > >>>>>>> of > > > > > >>>>>>>>> the > > > > > >>>>>>>>>>> hook > > > > > >>>>>>>>>>>>>>> mimicking behavior of UnboundedSource. > > > > > >>>>>>>>>>>>>>> - The relevant runner-agnostic tests are in > > > > > >>>>>>> SplittableDoFnTest > > > > > >>>>>>>>>>>>>>> < > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>> > > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325 > > > > > >>>>>>>>> 99024d3a1f/sdks/java/core/src/ > > test/java/org/apache/beam/sdk/ > > > > > >>>>>>>>> transforms/SplittableDoFnTest.java > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> . > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> That's all it takes, really - the runner has to > > > implement > > > > > >>>>>>>> these > > > > > >>>>>>>>>> two > > > > > >>>>>>>>>>>>>>> transforms. When I looked at Spark and Flink > runners, > > > it > > > > > >>>>>> was > > > > > >>>>>>>> not > > > > > >>>>>>>>>>> quite > > > > > >>>>>>>>>>>>>>> clear to me how to implement the > > GBKIntoKeyedWorkItems > > > > > >>>>>>>>> transform, > > > > > >>>>>>>>>>> e.g. > > > > > >>>>>>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at > > > all - > > > > > >>>>>>> but > > > > > >>>>>>>> it > > > > > >>>>>>>>>>> seems > > > > > >>>>>>>>>>>>>>> definitely possible. > > > > > >>>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>>> Thanks! > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> -- > > > > > >>>>>>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963 > > Berlin > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> [email protected] > > > > > > > > > >>>>>>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146> > <+49%2030%2055599146> > > > <+49%2030%2055599146> <+49%2030%2055599146> > > > > > >>>>>> <+49%2030%2055599146> > > > > > >>>>>>>> <+49%2030%2055599146> <+49%2030%2055599146> > > > > > >>>>>>>>>> <+49%2030%2055599146 <(205)%20559-9146>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB > 158244 > > B > > > > > >>>>>>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>>>> > > > > > >>>>>>>>>>>> > > > > > >>>>>>>>>>> > > > > > >>>>>>>>>> > > > > > >>>>>>>>> > > > > > >>>>>>>> > > > > > >>>>>>> > > > > > >>>>>> > > > > > >>>>>> > > > > > >>>>> > > > > > >>>> > > > > > >>>> -- > > > > > >>>> Jean-Baptiste Onofré > > > > > >>>> [email protected] > > > > > >>>> http://blog.nanthrax.net > > > > > >>>> Talend - http://www.talend.com > > > > > >>>> > > > > > >>>> > > > > > >>> > > > > > >> > > > > > >> -- > > > > > >> Jean-Baptiste Onofré > > > > > >> [email protected] > > > > > >> http://blog.nanthrax.net > > > > > >> Talend - http://www.talend.com > > > > > > > > > > -- > > > > > Jean-Baptiste Onofré > > > > > [email protected] > > > > > http://blog.nanthrax.net > > > > > Talend - http://www.talend.com > > > > > > > > > >
