Hi all,

Splittable DoFn is now available in Dataflow streaming runner, as of
https://github.com/apache/beam/pull/1898 !

Meanwhile, Flink support got disabled due to some churn as part of First
Stable Release, but it should be not hard to fix - tracked in
https://issues.apache.org/jira/browse/BEAM-2140 +Aljoscha Krettek
<[email protected]>

Thomas - any news on Apex?

On Sat, Apr 8, 2017 at 1:28 PM Thomas Weise <[email protected]> wrote:

> Nice work Aljoscha!
>
> Update WRT ApexRunner: We merged some prep work in the ParDoOperator to
> weed out remnants of OldDoFn. I have almost all the changes ready to add
> the support for Splittable DoFn (for most part those follow the Flink
> runner changes). The final piece missing to support the feature (based on
> observation from the test failures) is the timer internals.
>
> Thanks,
> Thomas
>
>
> On Sat, Apr 1, 2017 at 1:17 AM, Eugene Kirpichov <
> [email protected]> wrote:
>
> > Hey all,
> >
> > The Flink PR has been merged, and thus - Flink becomes the first
> > distributed runner to support Splittable DoFn!!!
> > Thank you, Aljoscha!
> >
> > Looking forward to Spark and Apex, and continuing work on Dataflow.
> > I'll also send proposals about a couple of new ideas related to SDF next
> > week.
> >
> > On Thu, Mar 30, 2017 at 9:08 AM Amit Sela <[email protected]> wrote:
> >
> > > I will not be able to make it this weekend, too busy. Let's chat at the
> > > beginning of next week and see what's on my plate.
> > >
> > > On Tue, Mar 28, 2017 at 5:44 PM Aljoscha Krettek <[email protected]>
> > > wrote:
> > >
> > > > Thanks for the offers, guys! The code is finished, though. I only
> need
> > > > to do the last touch ups.
> > > >
> > > > On Tue, Mar 28, 2017, at 09:16, JingsongLee wrote:
> > > > > Hi Aljoscha,
> > > > > I would like to work on the Flink runner with you.
> > > > >
> > > >
> > > Best,JingsongLee--------------------------------------------
> > ----------------------From:Jean-Baptiste
> > > > > Onofré <[email protected]>Time:2017 Mar 28 (Tue) 14:04To:dev
> > > > > <[email protected]>Subject:Re: Call for help: let's add
> Splittable
> > > > DoFn
> > > > > to Spark, Flink and Apex runners
> > > > > Hi Aljoscha,
> > > > >
> > > > > do you need some help on this ?
> > > > >
> > > > > Regards
> > > > > JB
> > > > >
> > > > > On 03/28/2017 08:00 AM, Aljoscha Krettek wrote:
> > > > > > Hi,
> > > > > > sorry for being so slow but I’m currently traveling.
> > > > > >
> > > > > > The Flink code works but I think it could benefit from some
> > > refactoring
> > > > > > to make the code nice and maintainable.
> > > > > >
> > > > > > Best,
> > > > > > Aljoscha
> > > > > >
> > > > > > On Tue, Mar 28, 2017, at 07:40, Jean-Baptiste Onofré wrote:
> > > > > >> I add myself on the Spark runner.
> > > > > >>
> > > > > >> Regards
> > > > > >> JB
> > > > > >>
> > > > > >> On 03/27/2017 08:18 PM, Eugene Kirpichov wrote:
> > > > > >>> Hi all,
> > > > > >>>
> > > > > >>> Let's continue the ~bi-weekly sync-ups about state of SDF
> support
> > > in
> > > > > >>> Spark/Flink/Apex runners.
> > > > > >>>
> > > > > >>> Spark:
> > > > >
> > > > >>> Amit, Aviem, Ismaël - when would be a good time for you; does
> same
> > > time
> > > > > >>> work (8am PST this Friday)? Who else would like to join?
> > > > > >>>
> > > > > >>> Flink:
> > > > > >>> I pinged the PR, but - Aljoscha, do you think it's worth
> > discussing
> > > > > >>> anything there over a videocall?
> > > > > >>>
> > > > > >>> Apex:
> > > > >
> > > > >>> Thomas - how about same time next Monday? (9:30am PST) Who else
> > > would like
> > > > > >>> to join?
> > > > > >>>
> > > > > >>> On Mon, Mar 20, 2017 at 9:59 AM Eugene Kirpichov <
> > > > [email protected]>
> > > > > >>> wrote:
> > > > > >>>
> > > > > >>>> Meeting notes:
> > > > > >>>> Me and Thomas had a video call and we pretty much walked
> through
> > > the
> > > > >
> > > > >>>> implementation of SDF in the runner-agnostic part and in the
> > direct
> > > runner.
> > > > > >>>> Flink and Apex are pretty similar, so likely
> > > > > >>>> https://github.com/apache/beam/pull/2235
> > > >  (the Flink PR) will give a very
> > > > > >>>> good guideline as to how to do this in Apex.
> > > > > >>>> Will talk again in ~2 weeks; and will involve +David Yan
> > > > > >>>> <[email protected]
> > > > > who is also on Apex and currently conveniently
> > > > >
> > > > >>>> works on the Google Dataflow team and, from in-person
> > conversation,
> > > was
> > > > > >>>> interested in being involved :)
> > > > > >>>>
> > > > > >>>> On Mon, Mar 20, 2017 at 7:34 AM Eugene Kirpichov <
> > > > [email protected]>
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>> Thomas - yes, 9:30 works, shall we do that?
> > > > > >>>>
> > > > >
> > > > >>>> JB - excellent! You can start experimenting already, using
> direct
> > > runner!
> > > > > >>>>
> > > > > >>>> On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré <
> > > [email protected]
> > > > >
> > > > > >>>> wrote:
> > > > > >>>>
> > > > > >>>> Hi Eugene,
> > > > > >>>>
> > > > > >>>> Thanks for the meeting notes !
> > > > > >>>>
> > > > >
> > > > >>>> I will be in the next call and Ismaël also provided to me some
> > > updates.
> > > > > >>>>
> > > > >
> > > > >>>> I will sync with Amit on Spark runner and start to experiment
> and
> > > test SDF
> > > > > >>>> on
> > > > > >>>> the JMS IO.
> > > > > >>>>
> > > > > >>>> Thanks !
> > > > > >>>> Regards
> > > > > >>>> JB
> > > > > >>>>
> > > > > >>>> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote:
> > > > > >>>>> Meeting notes from today's call with Amit, Aviem and Ismaël:
> > > > > >>>>>
> > > > > >>>>> Spark has 2 types of stateful operators; a cheap one intended
> > for
> > > > > >>>> updating
> > > > >
> > > > >>>>> elements (works with state but not with timers) and an
> expensive
> > > one.
> > > > > >>>> I.e.
> > > > >
> > > > >>>>> there's no efficient direct counterpart to Beam's keyed state
> > > model. In
> > > > >
> > > > >>>>> implementation of Beam State & Timers API, Spark runner will
> use
> > > the
> > > > >
> > > > >>>>> cheaper one for state and the expensive one for timers. So, for
> > > SDF,
> > > > > >>>> which
> > > > >
> > > > >>>>> in the runner-agnostic SplittableParDo expansion needs both
> state
> > > and
> > > > >
> > > > >>>>> timers, we'll need the expensive one - but this should be fine
> > > since with
> > > > >
> > > > >>>>> SDF the bottleneck should be in the ProcessElement call itself,
> > > not in
> > > > > >>>>> splitting/scheduling it.
> > > > > >>>>>
> > > > >
> > > > >>>>> For Spark batch runner, implementing SDF might be still
> simpler:
> > > runner
> > > > >
> > > > >>>>> will just not request any checkpointing. Hard parts about
> > > SDF/batch are
> > > > >
> > > > >>>>> dynamic rebalancing and size estimation APIs - they will be
> > > refined this
> > > > > >>>>> quarter, but it's ok to initially not have them.
> > > > > >>>>>
> > > > > >>>>> Spark runner might use a different expansion of SDF not
> > involving
> > > > > >>>>> KeyedWorkItem's (i.e. not overriding the
> GBKIntoKeyedWorkItems
> > > > > >>>> transform),
> > > > >
> > > > >>>>> though still striving to reuse as much code as possible from
> the
> > > standard
> > > > > >>>>> expansion implemented in SplittableParDo, at least ProcessFn.
> > > > > >>>>>
> > > > > >>>>> Testing questions:
> > > > > >>>>> - Spark runner already implements termination on
> > > > > >>>>> watermarks-reaching-infinity properly.
> > > > >
> > > > >>>>> - Q: How to test that the runner actually splits? A: The code
> > that
> > > splits
> > > > > >>>>> is in the runner-agnostic, so a runner would have to
> > deliberately
> > > > > >>>> sabotage
> > > > > >>>>> it in order to break it - unlikely. Also, for semantics we
> have
> > > > >
> > > > >>>>> runner-agnostic ROS tests; but at some point will need
> > performance
> > > tests
> > > > > >>>>> too.
> > > > > >>>>>
> > > > > >>>>> Next steps:
> > > > > >>>>> - Amit will look at the standard SplittableParDo expansion
> and
> > > > >
> > > > >>>>> implementation in Flink and Direct runner, will write up a doc
> > > about how
> > > > > >>>> to
> > > > > >>>>> do this in Spark.
> > > > > >>>>> - Another videotalk in 2 weeks to check on progress/issues.
> > > > > >>>>>
> > > > > >>>>> Thanks all!
> > > > > >>>>>
> > > > > >>>>> On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <
> > > > [email protected]>
> > > > > >>>>> wrote:
> > > > > >>>>>
> > > > >
> > > > >>>>>> Yes, Monday morning works! How about also 8am PST, same
> Hangout
> > > link -
> > > > > >>>>>> does that work for you?
> > > > > >>>>>>
> > > > > >>>>>> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <
> > > > [email protected]>
> > > > > >>>>>> wrote:
> > > > > >>>>>>
> > > > > >>>>>> Eugene,
> > > > > >>>>>>
> > > > >
> > > > >>>>>> I cannot make it for the call today. Would Monday morning work
> > > for you
> > > > > >>>> to
> > > > > >>>>>> discuss the Apex changes?
> > > > > >>>>>>
> > > > > >>>>>> Thanks
> > > > > >>>>>>
> > > > > >>>>>> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov <
> > > > > >>>>>> [email protected]> wrote:
> > > > > >>>>>>
> > > > >
> > > > >>>>>>> Hi! Please feel free to join this call, but I think we'd be
> > > mostly
> > > > >
> > > > >>>>>>> discussing how to do it in the Spark runner in particular; so
> > > we'll
> > > > > >>>>>>> probably need another call for Apex anyway.
> > > > > >>>>>>>
> > > > > >>>>>>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <
> [email protected]
> > > > > wrote:
> > > > > >>>>>>>
> > > > > >>>>>>>> Hi Eugene,
> > > > > >>>>>>>>
> > > > >
> > > > >>>>>>>> This would work for me also. Please let me know if you want
> to
> > > keep
> > > > > >>>> the
> > > > > >>>>>>>> Apex related discussion separate or want me to join this
> > call.
> > > > > >>>>>>>>
> > > > > >>>>>>>> Thanks,
> > > > > >>>>>>>> Thomas
> > > > > >>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov <
> > > > > >>>>>>>> [email protected]> wrote:
> > > > > >>>>>>>>
> > > > > >>>>>>>>> Sure, Friday morning sounds good. How about 9am Friday
> PST,
> > > at
> > > > > >>>>>>> videocall
> > > > > >>>>>>>> by
> > > > > >>>>>>>>> link
> > > > > >>>>>>
> > > https://hangouts.google.com/hangouts/_/google.com/splittabledofn
> > > > > >>>>>>> ?
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <
> > > > [email protected]>
> > > > > >>>>>>> wrote:
> > > > > >>>>>>>>>
> > > > >
> > > > >>>>>>>>>> PST mornings are better, because they are evening/nights
> for
> > > me.
> > > > > >>>>>>> Friday
> > > > > >>>>>>>>>> would work-out best for me.
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov
> > > > > >>>>>>>>>> <[email protected]> wrote:
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>> Awesome!!!
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> Amit - remind me your time zone? JB, do you want to
> join?
> > > > > >>>>>>>>>>> I'm free this week all afternoons (say after 2pm) in
> > > Pacific
> > > > > >>>>>> Time,
> > > > > >>>>>>>> and
> > > > > >>>>>>>>>>> mornings of Wed & Fri. We'll probably need half an hour
> > to
> > > an
> > > > > >>>>>> hour.
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <
> > > > > >>>>>>>> [email protected]>
> > > > > >>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>>> I whipped up a quick version for Flink that seems to
> > work:
> > > > > >>>>>>>>>>>> https://github.com/apache/beam/pull/2235
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> There are still two failing tests, as described in the
> > PR.
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
> > > > > >>>>>>>>>>>>> +1 for a video call. I think it should be pretty
> > straight
> > > > > >>>>>>> forward
> > > > > >>>>>>>>> for
> > > > > >>>>>>>>>>> the
> > > > >
> > > > >>>>>>>>>>>>> Spark runner after the work on read from
> UnboundedSource
> > > and
> > > > > >>>>>>>> after
> > > > > >>>>>>>>>>>>> GroupAlsoByWindow, but from my experience such a call
> > > could
> > > > > >>>>>>> move
> > > > > >>>>>>>> us
> > > > > >>>>>>>>>>>>> forward
> > > > > >>>>>>>>>>>>> fast enough.
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <
> > > > > >>>>>>>> [email protected]
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Hi all,
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Let us continue working on this. I am back from
> > various
> > > > > >>>>>>> travels
> > > > > >>>>>>>>> and
> > > > > >>>>>>>>>>> am
> > > > > >>>>>>>>>>>>>> eager to help.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Amit, JB - would you like to perhaps have a
> videocall
> > to
> > > > > >>>>>> hash
> > > > > >>>>>>>>> this
> > > > > >>>>>>>>>>> out
> > > > > >>>>>>>>>>>> for
> > > > > >>>>>>>>>>>>>> the Spark runner?
> > > > > >>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>> Aljoscha - are the necessary Flink changes done / or
> is
> > > the
> > > > > >>>>>>>> need
> > > > > >>>>>>>>>> for
> > > > > >>>>>>>>>>>> them
> > > > > >>>>>>>>>>>>>> obviated by using the (existing) runner-facing
> > > state/timer
> > > > > >>>>>>>> APIs?
> > > > > >>>>>>>>>>>> Should we
> > > > > >>>>>>>>>>>>>> have a videocall too?
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thomas - what do you think about getting this into
> > Apex
> > > > > >>>>>>> runner?
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> (I think videocalls will allow to make rapid
> progress,
> > > but
> > > > > >>>>>>> it's
> > > > > >>>>>>>>>>>> probably a
> > > > > >>>>>>>>>>>>>> better idea to keep them separate since they'll
> > involve
> > > a
> > > > > >>>>>> lot
> > > > > >>>>>>>> of
> > > > > >>>>>>>>>>>>>> runner-specific details)
> > > > > >>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>> PS - The completion of this in Dataflow streaming
> runner
> > > is
> > > > > >>>>>>>>>> currently
> > > > > >>>>>>>>>>>>>> waiting only on having a small service-side change
> > > > > >>>>>>> implemented
> > > > > >>>>>>>>> and
> > > > > >>>>>>>>>>>> rolled
> > > > > >>>>>>>>>>>>>> out for termination of streaming jobs.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <
> > > > > >>>>>>>> [email protected]>
> > > > > >>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>> I recommend proceeding with the runner-facing state &
> > > timer
> > > > > >>>>>>>> APIs;
> > > > > >>>>>>>>>>> they
> > > > > >>>>>>>>>>>> are
> > > > > >>>>>>>>>>>>>> lower-level and more appropriate for this. All
> runners
> > > > > >>>>>>> provide
> > > > > >>>>>>>>> them
> > > > > >>>>>>>>>>> or
> > > > > >>>>>>>>>>>> use
> > > > > >>>>>>>>>>>>>> runners/core implementations, as they are needed for
> > > > > >>>>>>>> triggering.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
> > > > > >>>>>>>>>>>> [email protected]>
> > > > > >>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thanks Aljoscha!
> > > > > >>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>> Minor note: I'm not familiar with what level of
> support
> > > for
> > > > > >>>>>>>>> timers
> > > > > >>>>>>>>>>>> Flink
> > > > > >>>>>>>>>>>>>> currently has - however SDF in Direct and Dataflow
> > > runner
> > > > > >>>>>>>>> currently
> > > > > >>>>>>>>>>>> does
> > > > > >>>>>>>>>>>>>> not use the user-facing state/timer APIs - rather,
> it
> > > uses
> > > > > >>>>>>> the
> > > > > >>>>>>>>>>>>>> runner-facing APIs (StateInternals and
> > TimerInternals) -
> > > > > >>>>>>>> perhaps
> > > > > >>>>>>>>>>> Flink
> > > > > >>>>>>>>>>>>>> already implements these. We may want to change
> this,
> > > but
> > > > > >>>>>> for
> > > > > >>>>>>>> now
> > > > > >>>>>>>>>>> it's
> > > > > >>>>>>>>>>>> good
> > > > > >>>>>>>>>>>>>> enough (besides, SDF uses watermark holds, which are
> > not
> > > > > >>>>>>>>> supported
> > > > > >>>>>>>>>> by
> > > > > >>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>> user-facing state API yet).
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
> > > > > >>>>>>>>>>>>>> [email protected]> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thanks for the motivation, Eugene! :-)
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> I've wanted to do this for a while now but was
> waiting
> > > for
> > > > > >>>>>>> the
> > > > > >>>>>>>>>> Flink
> > > > > >>>>>>>>>>>> 1.2
> > > > > >>>>>>>>>>>>>> release (which happened this week)! There's some
> > > > > >>>>>> prerequisite
> > > > > >>>>>>>>> work
> > > > > >>>>>>>>>> to
> > > > > >>>>>>>>>>>> be
> > > > > >>>>>>>>>>>>>> done on the Flink runner: we'll move to the new
> timer
> > > > > >>>>>>>> interfaces
> > > > > >>>>>>>>>>>> introduced
> > > > >
> > > > >>>>>>>>>>>>>> in Flink 1.2 and implement support for both the user
> > > facing
> > > > > >>>>>>>> state
> > > > > >>>>>>>>>> and
> > > > > >>>>>>>>>>>> timer
> > > > > >>>>>>>>>>>>>> APIs. This should make implementation of SDF easier.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <
> > > > > >>>>>>>>>>> [email protected]
> > > > > >>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thanks! Looking forward to this work.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré
> <
> > > > > >>>>>>>>>> [email protected]
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> wrote:
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Thanks for the update Eugene.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> I will work on the spark runner with Amit.
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Regards
> > > > > >>>>>>>>>>>>>> JB
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
> > > > > >>>>>>>>>>>>>> <[email protected]> wrote:
> > > > > >>>>>>>>>>>>>>> Hello,
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> I'm almost done adding support for Splittable DoFn
> > > > > >>>>>>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow
> > > > > >>>>>> streaming
> > > > > >>>>>>>>>> runner*,
> > > > > >>>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>> very excited about that. There's only 1 PR
> > > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1898>
> > remaining,
> > > > > >>>>>> plus
> > > > > >>>>>>>>>> enabling
> > > > > >>>>>>>>>>>>>>> some
> > > > > >>>>>>>>>>>>>>> tests.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> * (batch runner is much harder because it's not yet
> > > quite
> > > > > >>>>>>>> clear
> > > > > >>>>>>>>> to
> > > > > >>>>>>>>>>> me
> > > > > >>>>>>>>>>>>>>> how
> > > > > >>>>>>>>>>>>>>> to properly implement liquid sharding
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> https://cloud.google.com/blog/big-data/2016/05/no-shard-
> > > > > >>>>>>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-
> > dataflow
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> with
> > > > > >>>>>>>>>>>>>>> SDF - and the current API is not ready for that
> yet)
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> After implementing all the runner-agnostic parts of
> > > > > >>>>>>> Splittable
> > > > > >>>>>>>>>>> DoFn, I
> > > > >
> > > > >>>>>>>>>>>>>>> found them quite easy to integrate into Dataflow
> > > streaming
> > > > > >>>>>>>>> runner,
> > > > > >>>>>>>>>>> and
> > > > > >>>>>>>>>>>>>>> I
> > > > >
> > > > >>>>>>>>>>>>>>> think this means it should be easy to integrate into
> > > other
> > > > > >>>>>>>>> runners
> > > > > >>>>>>>>>>>> too.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> ====== Why it'd be cool ======
> > > > > >>>>>>>>>>>>>>> The general benefits of SDF are well-described in
> the
> > > > > >>>>>> design
> > > > > >>>>>>>> doc
> > > > > >>>>>>>>>>>>>>> (linked
> > > > > >>>>>>>>>>>>>>> above).
> > > > > >>>>>>>>>>>>>>> As for right now - if we integrated SDF with all
> > > runners,
> > > > > >>>>>>> it'd
> > > > > >>>>>>>>>>> already
> > > > > >>>>>>>>>>>>>>> enable us to start greatly simplifying the code of
> > > > > >>>>>> existing
> > > > > >>>>>>>>>>> streaming
> > > > >
> > > > >>>>>>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and
> > > writing
> > > > > >>>>>>> new
> > > > > >>>>>>>>>>>>>>> connectors
> > > > > >>>>>>>>>>>>>>> (e.g. a really nice one to implement would be
> > > "directory
> > > > > >>>>>>>>> watcher",
> > > > > >>>>>>>>>>>> that
> > > > > >>>>>>>>>>>>>>> continuously returns new files in a directory).
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> As a teaser, here's the complete implementation of
> an
> > > > > >>>>>>>> "unbounded
> > > > > >>>>>>>>>>>>>>> counter" I
> > > > > >>>>>>>>>>>>>>> used for my test of Dataflow runner integration:
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>  class CountFn extends DoFn<String, String> {
> > > > > >>>>>>>>>>>>>>>    @ProcessElement
> > > > > >>>>>>>>>>>>>>> public ProcessContinuation process(ProcessContext
> c,
> > > > > >>>>>>>>>>>> OffsetRangeTracker
> > > > > >>>>>>>>>>>>>>> tracker) {
> > > > > >>>>>>>>>>>>>>>      for (int i =
> > > tracker.currentRestriction().getFrom();
> > > > > >>>>>>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i);
> > > > > >>>>>>>>>>>>>>>      return resume();
> > > > > >>>>>>>>>>>>>>>    }
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>    @GetInitialRestriction
> > > > > >>>>>>>>>>>>>>>    public OffsetRange getInitialRange(String
> > element) {
> > > > > >>>>>>>> return
> > > > > >>>>>>>>>> new
> > > > > >>>>>>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); }
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>>    @NewTracker
> > > > > >>>>>>>>>>>>>>>   public OffsetRangeTracker newTracker(OffsetRange
> > > > > >>>>>> range) {
> > > > > >>>>>>>>>> return
> > > > > >>>>>>>>>>>> new
> > > > > >>>>>>>>>>>>>>> OffsetRangeTracker(range); }
> > > > > >>>>>>>>>>>>>>>  }
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> ====== What I'm asking ======
> > > > > >>>>>>>>>>>>>>> So, I'd like to ask for help integrating SDF into
> > > Spark,
> > > > > >>>>>>> Flink
> > > > > >>>>>>>>> and
> > > > > >>>>>>>>>>>> Apex
> > > > > >>>>>>>>>>>>>>> runners from people who are intimately familiar
> with
> > > them
> > > > > >>>>>> -
> > > > > >>>>>>>>>>>>>>> specifically, I
> > > > > >>>>>>>>>>>>>>> was hoping best-case I could nerd-snipe some of you
> > > into
> > > > > >>>>>>>> taking
> > > > > >>>>>>>>>> over
> > > > > >>>>>>>>>>>>>>> the
> > > > > >>>>>>>>>>>>>>> integration of SDF with your favorite runner ;)
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> The proper set of people seems to be +Aljoscha
> > Krettek
> > > > > >>>>>>>>>>>>>>> <[email protected]> +Maximilian Michels
> > > > > >>>>>>>>>>>>>>> <[email protected]>
> > > > > >>>>>>>>>>>>>>> [email protected] <[email protected]> +Amit Sela
> > > > > >>>>>>>>>>>>>>> <[email protected]> +Thomas
> > > > > >>>>>>>>>>>>>>> Weise unless I forgot somebody.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Average-case, I was looking for runner-specific
> > > guidance
> > > > > >>>>>> on
> > > > > >>>>>>>> how
> > > > > >>>>>>>>> to
> > > > > >>>>>>>>>>> do
> > > > > >>>>>>>>>>>>>>> it
> > > > > >>>>>>>>>>>>>>> myself.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> ====== If you want to help ======
> > > > >
> > > > >>>>>>>>>>>>>>> If somebody decides to take this over, in my absence
> > > (I'll
> > > > > >>>>>>> be
> > > > > >>>>>>>>>> mostly
> > > > > >>>>>>>>>>>>>>> gone
> > > > > >>>>>>>>>>>>>>> for ~the next month)., the best people to ask for
> > > > > >>>>>>>> implementation
> > > > > >>>>>>>>>>>>>>> advice are +Kenn
> > > > > >>>>>>>>>>>>>>> Knowles <[email protected]> and +Daniel Mills <
> > > > > >>>>>>> [email protected]
> > > > > >>>>>>>>>
> > > > > >>>>>>>>> .
> > > > > >>>>>>>>>>>>>>>
> > > > >
> > > > >>>>>>>>>>>>>>> For reference, here's how SDF is implemented in the
> > > direct
> > > > > >>>>>>>>> runner:
> > > > > >>>>>>>>>>>>>>> - Direct runner overrides
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b
> > > > > >>>>>>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/
> > > > > >>>>>>>>> beam/runners/direct/ParDoMultiOverrideFactory.java
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it
> with
> > > > > >>>>>>>>>>> SplittableParDo
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> https://github.com/apache/beam/blob/master/runners/core-
> > > > >
> > > > >>>>>>>>>
> > > java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> (common
> > > > > >>>>>>>>>>>>>>> transform expansion)
> > > > > >>>>>>>>>>>>>>> - SplittableParDo uses two runner-specific
> primitive
> > > > > >>>>>>>> transforms:
> > > > > >>>>>>>>>>>>>>> "GBKIntoKeyedWorkItems" and
> > > "SplittableProcessElements".
> > > > > >>>>>>>> Direct
> > > > > >>>>>>>>>>> runner
> > > > > >>>>>>>>>>>>>>> overrides the first one like this
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> > > > >
> > > > >>>>>>>>>
> > > beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
> > > > > >>>>>>>>>>>>>>> ,
> > > > > >>>>>>>>>>>>>>> and directly implements evaluation of the second
> one
> > > like
> > > > > >>>>>>> this
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > > > >>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
> > > > >
> > > > >>>>>>>>>
> > > beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
> > > > > >>>>>>>>>>>>>>> ,
> > > > > >>>>>>>>>>>>>>> using runner hooks introduced in this PR
> > > > > >>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the
> > > core
> > > > > >>>>>> of
> > > > > >>>>>>>> the
> > > > > >>>>>>>>>>> hooks
> > > > > >>>>>>>>>>>> is
> > > > > >>>>>>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to
> > be
> > > > > >>>>>>>> prepared
> > > > > >>>>>>>>> at
> > > > > >>>>>>>>>>>>>>> runtime
> > > > > >>>>>>>>>>>>>>> with some hooks (state, timers, and runner access
> to
> > > > > >>>>>>>>>>>>>>> RestrictionTracker)
> > > > >
> > > > >>>>>>>>>>>>>>> before you invoke it. I added a convenience
> > > implementation
> > > > > >>>>>>> of
> > > > > >>>>>>>>> the
> > > > > >>>>>>>>>>> hook
> > > > > >>>>>>>>>>>>>>> mimicking behavior of UnboundedSource.
> > > > > >>>>>>>>>>>>>>> - The relevant runner-agnostic tests are in
> > > > > >>>>>>> SplittableDoFnTest
> > > > > >>>>>>>>>>>>>>> <
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
> > > > > >>>>>>>>> 99024d3a1f/sdks/java/core/src/
> > test/java/org/apache/beam/sdk/
> > > > > >>>>>>>>> transforms/SplittableDoFnTest.java
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> .
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> That's all it takes, really - the runner has to
> > > implement
> > > > > >>>>>>>> these
> > > > > >>>>>>>>>> two
> > > > > >>>>>>>>>>>>>>> transforms. When I looked at Spark and Flink
> runners,
> > > it
> > > > > >>>>>> was
> > > > > >>>>>>>> not
> > > > > >>>>>>>>>>> quite
> > > > > >>>>>>>>>>>>>>> clear to me how to implement the
> > GBKIntoKeyedWorkItems
> > > > > >>>>>>>>> transform,
> > > > > >>>>>>>>>>> e.g.
> > > > > >>>>>>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at
> > > all -
> > > > > >>>>>>> but
> > > > > >>>>>>>> it
> > > > > >>>>>>>>>>> seems
> > > > > >>>>>>>>>>>>>>> definitely possible.
> > > > > >>>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>> Thanks!
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> --
> > > > > >>>>>>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963
> > Berlin
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> [email protected]
> > > > >
> > > > >>>>>>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146>
> <+49%2030%2055599146>
> > > <+49%2030%2055599146> <+49%2030%2055599146>
> > > > > >>>>>> <+49%2030%2055599146>
> > > > > >>>>>>>> <+49%2030%2055599146> <+49%2030%2055599146>
> > > > > >>>>>>>>>> <+49%2030%2055599146 <(205)%20559-9146>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB
> 158244
> > B
> > > > > >>>>>>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>>>
> > > > > >>>>>>>>>>>>
> > > > > >>>>>>>>>>>
> > > > > >>>>>>>>>>
> > > > > >>>>>>>>>
> > > > > >>>>>>>>
> > > > > >>>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>> --
> > > > > >>>> Jean-Baptiste Onofré
> > > > > >>>> [email protected]
> > > > > >>>> http://blog.nanthrax.net
> > > > > >>>> Talend - http://www.talend.com
> > > > > >>>>
> > > > > >>>>
> > > > > >>>
> > > > > >>
> > > > > >> --
> > > > > >> Jean-Baptiste Onofré
> > > > > >> [email protected]
> > > > > >> http://blog.nanthrax.net
> > > > > >> Talend - http://www.talend.com
> > > > >
> > > > > --
> > > > > Jean-Baptiste Onofré
> > > > > [email protected]
> > > > > http://blog.nanthrax.net
> > > > > Talend - http://www.talend.com
> > > >
> > >
> >
>

Reply via email to