Hi Aljoscha,
I would like to work on the Flink runner with you.
Best,JingsongLee------------------------------------------------------------------From:Jean-Baptiste
 Onofré <j...@nanthrax.net>Time:2017 Mar 28 (Tue) 14:04To:dev 
<dev@beam.apache.org>Subject:Re: Call for help: let's add Splittable DoFn to 
Spark, Flink and Apex runners
Hi Aljoscha,

do you need some help on this ?

Regards
JB

On 03/28/2017 08:00 AM, Aljoscha Krettek wrote:
> Hi,
> sorry for being so slow but I’m currently traveling.
>
> The Flink code works but I think it could benefit from some refactoring
> to make the code nice and maintainable.
>
> Best,
> Aljoscha
>
> On Tue, Mar 28, 2017, at 07:40, Jean-Baptiste Onofré wrote:
>> I add myself on the Spark runner.
>>
>> Regards
>> JB
>>
>> On 03/27/2017 08:18 PM, Eugene Kirpichov wrote:
>>> Hi all,
>>>
>>> Let's continue the ~bi-weekly sync-ups about state of SDF support in
>>> Spark/Flink/Apex runners.
>>>
>>> Spark:
>>> Amit, Aviem, Ismaël - when would be a good time for you; does same time
>>> work (8am PST this Friday)? Who else would like to join?
>>>
>>> Flink:
>>> I pinged the PR, but - Aljoscha, do you think it's worth discussing
>>> anything there over a videocall?
>>>
>>> Apex:
>>> Thomas - how about same time next Monday? (9:30am PST) Who else would like
>>> to join?
>>>
>>> On Mon, Mar 20, 2017 at 9:59 AM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> Meeting notes:
>>>> Me and Thomas had a video call and we pretty much walked through the
>>>> implementation of SDF in the runner-agnostic part and in the direct runner.
>>>> Flink and Apex are pretty similar, so likely
>>>> https://github.com/apache/beam/pull/2235 (the Flink PR) will give a very
>>>> good guideline as to how to do this in Apex.
>>>> Will talk again in ~2 weeks; and will involve +David Yan
>>>> <david...@google.com> who is also on Apex and currently conveniently
>>>> works on the Google Dataflow team and, from in-person conversation, was
>>>> interested in being involved :)
>>>>
>>>> On Mon, Mar 20, 2017 at 7:34 AM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>> Thomas - yes, 9:30 works, shall we do that?
>>>>
>>>> JB - excellent! You can start experimenting already, using direct runner!
>>>>
>>>> On Mon, Mar 20, 2017, 2:26 AM Jean-Baptiste Onofré <j...@nanthrax.net>
>>>> wrote:
>>>>
>>>> Hi Eugene,
>>>>
>>>> Thanks for the meeting notes !
>>>>
>>>> I will be in the next call and Ismaël also provided to me some updates.
>>>>
>>>> I will sync with Amit on Spark runner and start to experiment and test SDF
>>>> on
>>>> the JMS IO.
>>>>
>>>> Thanks !
>>>> Regards
>>>> JB
>>>>
>>>> On 03/17/2017 04:36 PM, Eugene Kirpichov wrote:
>>>>> Meeting notes from today's call with Amit, Aviem and Ismaël:
>>>>>
>>>>> Spark has 2 types of stateful operators; a cheap one intended for
>>>> updating
>>>>> elements (works with state but not with timers) and an expensive one.
>>>> I.e.
>>>>> there's no efficient direct counterpart to Beam's keyed state model. In
>>>>> implementation of Beam State & Timers API, Spark runner will use the
>>>>> cheaper one for state and the expensive one for timers. So, for SDF,
>>>> which
>>>>> in the runner-agnostic SplittableParDo expansion needs both state and
>>>>> timers, we'll need the expensive one - but this should be fine since with
>>>>> SDF the bottleneck should be in the ProcessElement call itself, not in
>>>>> splitting/scheduling it.
>>>>>
>>>>> For Spark batch runner, implementing SDF might be still simpler: runner
>>>>> will just not request any checkpointing. Hard parts about SDF/batch are
>>>>> dynamic rebalancing and size estimation APIs - they will be refined this
>>>>> quarter, but it's ok to initially not have them.
>>>>>
>>>>> Spark runner might use a different expansion of SDF not involving
>>>>> KeyedWorkItem's (i.e. not overriding the GBKIntoKeyedWorkItems
>>>> transform),
>>>>> though still striving to reuse as much code as possible from the standard
>>>>> expansion implemented in SplittableParDo, at least ProcessFn.
>>>>>
>>>>> Testing questions:
>>>>> - Spark runner already implements termination on
>>>>> watermarks-reaching-infinity properly.
>>>>> - Q: How to test that the runner actually splits? A: The code that splits
>>>>> is in the runner-agnostic, so a runner would have to deliberately
>>>> sabotage
>>>>> it in order to break it - unlikely. Also, for semantics we have
>>>>> runner-agnostic ROS tests; but at some point will need performance tests
>>>>> too.
>>>>>
>>>>> Next steps:
>>>>> - Amit will look at the standard SplittableParDo expansion and
>>>>> implementation in Flink and Direct runner, will write up a doc about how
>>>> to
>>>>> do this in Spark.
>>>>> - Another videotalk in 2 weeks to check on progress/issues.
>>>>>
>>>>> Thanks all!
>>>>>
>>>>> On Fri, Mar 17, 2017 at 8:29 AM Eugene Kirpichov <kirpic...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, Monday morning works! How about also 8am PST, same Hangout link -
>>>>>> does that work for you?
>>>>>>
>>>>>> On Fri, Mar 17, 2017 at 7:50 AM Thomas Weise <thomas.we...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Eugene,
>>>>>>
>>>>>> I cannot make it for the call today. Would Monday morning work for you
>>>> to
>>>>>> discuss the Apex changes?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> On Tue, Mar 14, 2017 at 7:27 PM, Eugene Kirpichov <
>>>>>> kirpic...@google.com.invalid> wrote:
>>>>>>
>>>>>>> Hi! Please feel free to join this call, but I think we'd be mostly
>>>>>>> discussing how to do it in the Spark runner in particular; so we'll
>>>>>>> probably need another call for Apex anyway.
>>>>>>>
>>>>>>> On Tue, Mar 14, 2017 at 6:54 PM Thomas Weise <t...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi Eugene,
>>>>>>>>
>>>>>>>> This would work for me also. Please let me know if you want to keep
>>>> the
>>>>>>>> Apex related discussion separate or want me to join this call.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Mar 14, 2017 at 1:56 PM, Eugene Kirpichov <
>>>>>>>> kirpic...@google.com.invalid> wrote:
>>>>>>>>
>>>>>>>>> Sure, Friday morning sounds good. How about 9am Friday PST, at
>>>>>>> videocall
>>>>>>>> by
>>>>>>>>> link
>>>>>> https://hangouts.google.com/hangouts/_/google.com/splittabledofn
>>>>>>> ?
>>>>>>>>>
>>>>>>>>> On Mon, Mar 13, 2017 at 10:30 PM Amit Sela <amitsel...@gmail.com>
>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> PST mornings are better, because they are evening/nights for me.
>>>>>>> Friday
>>>>>>>>>> would work-out best for me.
>>>>>>>>>>
>>>>>>>>>> On Mon, Mar 13, 2017 at 11:46 PM Eugene Kirpichov
>>>>>>>>>> <kirpic...@google.com.invalid> wrote:
>>>>>>>>>>
>>>>>>>>>>> Awesome!!!
>>>>>>>>>>>
>>>>>>>>>>> Amit - remind me your time zone? JB, do you want to join?
>>>>>>>>>>> I'm free this week all afternoons (say after 2pm) in Pacific
>>>>>> Time,
>>>>>>>> and
>>>>>>>>>>> mornings of Wed & Fri. We'll probably need half an hour to an
>>>>>> hour.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Mar 13, 2017 at 1:29 PM Aljoscha Krettek <
>>>>>>>> aljos...@apache.org>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I whipped up a quick version for Flink that seems to work:
>>>>>>>>>>>> https://github.com/apache/beam/pull/2235
>>>>>>>>>>>>
>>>>>>>>>>>> There are still two failing tests, as described in the PR.
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Mar 13, 2017, at 20:10, Amit Sela wrote:
>>>>>>>>>>>>> +1 for a video call. I think it should be pretty straight
>>>>>>> forward
>>>>>>>>> for
>>>>>>>>>>> the
>>>>>>>>>>>>> Spark runner after the work on read from UnboundedSource and
>>>>>>>> after
>>>>>>>>>>>>> GroupAlsoByWindow, but from my experience such a call could
>>>>>>> move
>>>>>>>> us
>>>>>>>>>>>>> forward
>>>>>>>>>>>>> fast enough.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Mar 13, 2017, 20:37 Eugene Kirpichov <
>>>>>>>> kirpic...@google.com
>>>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let us continue working on this. I am back from various
>>>>>>> travels
>>>>>>>>> and
>>>>>>>>>>> am
>>>>>>>>>>>>>> eager to help.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Amit, JB - would you like to perhaps have a videocall to
>>>>>> hash
>>>>>>>>> this
>>>>>>>>>>> out
>>>>>>>>>>>> for
>>>>>>>>>>>>>> the Spark runner?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Aljoscha - are the necessary Flink changes done / or is the
>>>>>>>> need
>>>>>>>>>> for
>>>>>>>>>>>> them
>>>>>>>>>>>>>> obviated by using the (existing) runner-facing state/timer
>>>>>>>> APIs?
>>>>>>>>>>>> Should we
>>>>>>>>>>>>>> have a videocall too?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thomas - what do you think about getting this into Apex
>>>>>>> runner?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> (I think videocalls will allow to make rapid progress, but
>>>>>>> it's
>>>>>>>>>>>> probably a
>>>>>>>>>>>>>> better idea to keep them separate since they'll involve a
>>>>>> lot
>>>>>>>> of
>>>>>>>>>>>>>> runner-specific details)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> PS - The completion of this in Dataflow streaming runner is
>>>>>>>>>> currently
>>>>>>>>>>>>>> waiting only on having a small service-side change
>>>>>>> implemented
>>>>>>>>> and
>>>>>>>>>>>> rolled
>>>>>>>>>>>>>> out for termination of streaming jobs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:55 AM Kenneth Knowles <
>>>>>>>> k...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I recommend proceeding with the runner-facing state & timer
>>>>>>>> APIs;
>>>>>>>>>>> they
>>>>>>>>>>>> are
>>>>>>>>>>>>>> lower-level and more appropriate for this. All runners
>>>>>>> provide
>>>>>>>>> them
>>>>>>>>>>> or
>>>>>>>>>>>> use
>>>>>>>>>>>>>> runners/core implementations, as they are needed for
>>>>>>>> triggering.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:34 AM, Eugene Kirpichov <
>>>>>>>>>>>> kirpic...@google.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks Aljoscha!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Minor note: I'm not familiar with what level of support for
>>>>>>>>> timers
>>>>>>>>>>>> Flink
>>>>>>>>>>>>>> currently has - however SDF in Direct and Dataflow runner
>>>>>>>>> currently
>>>>>>>>>>>> does
>>>>>>>>>>>>>> not use the user-facing state/timer APIs - rather, it uses
>>>>>>> the
>>>>>>>>>>>>>> runner-facing APIs (StateInternals and TimerInternals) -
>>>>>>>> perhaps
>>>>>>>>>>> Flink
>>>>>>>>>>>>>> already implements these. We may want to change this, but
>>>>>> for
>>>>>>>> now
>>>>>>>>>>> it's
>>>>>>>>>>>> good
>>>>>>>>>>>>>> enough (besides, SDF uses watermark holds, which are not
>>>>>>>>> supported
>>>>>>>>>> by
>>>>>>>>>>>> the
>>>>>>>>>>>>>> user-facing state API yet).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 10:19 AM Aljoscha Krettek <
>>>>>>>>>>>>>> aljos...@data-artisans.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the motivation, Eugene! :-)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I've wanted to do this for a while now but was waiting for
>>>>>>> the
>>>>>>>>>> Flink
>>>>>>>>>>>> 1.2
>>>>>>>>>>>>>> release (which happened this week)! There's some
>>>>>> prerequisite
>>>>>>>>> work
>>>>>>>>>> to
>>>>>>>>>>>> be
>>>>>>>>>>>>>> done on the Flink runner: we'll move to the new timer
>>>>>>>> interfaces
>>>>>>>>>>>> introduced
>>>>>>>>>>>>>> in Flink 1.2 and implement support for both the user facing
>>>>>>>> state
>>>>>>>>>> and
>>>>>>>>>>>> timer
>>>>>>>>>>>>>> APIs. This should make implementation of SDF easier.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 7:06 PM, Eugene Kirpichov <
>>>>>>>>>>> kirpic...@google.com
>>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks! Looking forward to this work.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Feb 8, 2017 at 3:50 AM Jean-Baptiste Onofré <
>>>>>>>>>> j...@nanthrax.net
>>>>>>>>>>>>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the update Eugene.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I will work on the spark runner with Amit.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>> JB
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Feb 7, 2017, 19:12, at 19:12, Eugene Kirpichov
>>>>>>>>>>>>>> <kirpic...@google.com.INVALID> wrote:
>>>>>>>>>>>>>>> Hello,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm almost done adding support for Splittable DoFn
>>>>>>>>>>>>>>> http://s.apache.org/splittable-do-fn to Dataflow
>>>>>> streaming
>>>>>>>>>> runner*,
>>>>>>>>>>>> and
>>>>>>>>>>>>>>> very excited about that. There's only 1 PR
>>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1898> remaining,
>>>>>> plus
>>>>>>>>>> enabling
>>>>>>>>>>>>>>> some
>>>>>>>>>>>>>>> tests.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * (batch runner is much harder because it's not yet quite
>>>>>>>> clear
>>>>>>>>> to
>>>>>>>>>>> me
>>>>>>>>>>>>>>> how
>>>>>>>>>>>>>>> to properly implement liquid sharding
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> https://cloud.google.com/blog/big-data/2016/05/no-shard-
>>>>>>>>> left-behind-dynamic-work-rebalancing-in-google-cloud-dataflow
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> with
>>>>>>>>>>>>>>> SDF - and the current API is not ready for that yet)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> After implementing all the runner-agnostic parts of
>>>>>>> Splittable
>>>>>>>>>>> DoFn, I
>>>>>>>>>>>>>>> found them quite easy to integrate into Dataflow streaming
>>>>>>>>> runner,
>>>>>>>>>>> and
>>>>>>>>>>>>>>> I
>>>>>>>>>>>>>>> think this means it should be easy to integrate into other
>>>>>>>>> runners
>>>>>>>>>>>> too.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ====== Why it'd be cool ======
>>>>>>>>>>>>>>> The general benefits of SDF are well-described in the
>>>>>> design
>>>>>>>> doc
>>>>>>>>>>>>>>> (linked
>>>>>>>>>>>>>>> above).
>>>>>>>>>>>>>>> As for right now - if we integrated SDF with all runners,
>>>>>>> it'd
>>>>>>>>>>> already
>>>>>>>>>>>>>>> enable us to start greatly simplifying the code of
>>>>>> existing
>>>>>>>>>>> streaming
>>>>>>>>>>>>>>> connectors (CountingInput, Kafka, Pubsub, JMS) and writing
>>>>>>> new
>>>>>>>>>>>>>>> connectors
>>>>>>>>>>>>>>> (e.g. a really nice one to implement would be "directory
>>>>>>>>> watcher",
>>>>>>>>>>>> that
>>>>>>>>>>>>>>> continuously returns new files in a directory).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> As a teaser, here's the complete implementation of an
>>>>>>>> "unbounded
>>>>>>>>>>>>>>> counter" I
>>>>>>>>>>>>>>> used for my test of Dataflow runner integration:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>  class CountFn extends DoFn<String, String> {
>>>>>>>>>>>>>>>    @ProcessElement
>>>>>>>>>>>>>>> public ProcessContinuation process(ProcessContext c,
>>>>>>>>>>>> OffsetRangeTracker
>>>>>>>>>>>>>>> tracker) {
>>>>>>>>>>>>>>>      for (int i = tracker.currentRestriction().getFrom();
>>>>>>>>>>>>>>> tracker.tryClaim(i); ++i) c.output(i);
>>>>>>>>>>>>>>>      return resume();
>>>>>>>>>>>>>>>    }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    @GetInitialRestriction
>>>>>>>>>>>>>>>    public OffsetRange getInitialRange(String element) {
>>>>>>>> return
>>>>>>>>>> new
>>>>>>>>>>>>>>> OffsetRange(0, Integer.MAX_VALUE); }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    @NewTracker
>>>>>>>>>>>>>>>   public OffsetRangeTracker newTracker(OffsetRange
>>>>>> range) {
>>>>>>>>>> return
>>>>>>>>>>>> new
>>>>>>>>>>>>>>> OffsetRangeTracker(range); }
>>>>>>>>>>>>>>>  }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ====== What I'm asking ======
>>>>>>>>>>>>>>> So, I'd like to ask for help integrating SDF into Spark,
>>>>>>> Flink
>>>>>>>>> and
>>>>>>>>>>>> Apex
>>>>>>>>>>>>>>> runners from people who are intimately familiar with them
>>>>>> -
>>>>>>>>>>>>>>> specifically, I
>>>>>>>>>>>>>>> was hoping best-case I could nerd-snipe some of you into
>>>>>>>> taking
>>>>>>>>>> over
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> integration of SDF with your favorite runner ;)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The proper set of people seems to be +Aljoscha Krettek
>>>>>>>>>>>>>>> <aljos...@data-artisans.com> +Maximilian Michels
>>>>>>>>>>>>>>> <m...@data-artisans.com>
>>>>>>>>>>>>>>> +ieme...@gmail.com <ieme...@gmail.com> +Amit Sela
>>>>>>>>>>>>>>> <amitsel...@gmail.com> +Thomas
>>>>>>>>>>>>>>> Weise unless I forgot somebody.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Average-case, I was looking for runner-specific guidance
>>>>>> on
>>>>>>>> how
>>>>>>>>> to
>>>>>>>>>>> do
>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>> myself.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ====== If you want to help ======
>>>>>>>>>>>>>>> If somebody decides to take this over, in my absence (I'll
>>>>>>> be
>>>>>>>>>> mostly
>>>>>>>>>>>>>>> gone
>>>>>>>>>>>>>>> for ~the next month)., the best people to ask for
>>>>>>>> implementation
>>>>>>>>>>>>>>> advice are +Kenn
>>>>>>>>>>>>>>> Knowles <k...@google.com> and +Daniel Mills <
>>>>>>> mil...@google.com
>>>>>>>>>
>>>>>>>>> .
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For reference, here's how SDF is implemented in the direct
>>>>>>>>> runner:
>>>>>>>>>>>>>>> - Direct runner overrides
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/0616245e654c60ae94cc2c188f857b
>>>>>>>>> 74a62d9b24/runners/direct-java/src/main/java/org/apache/
>>>>>>>>> beam/runners/direct/ParDoMultiOverrideFactory.java
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> ParDo.of() for a splittable DoFn and replaces it with
>>>>>>>>>>> SplittableParDo
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/master/runners/core-
>>>>>>>>> java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> (common
>>>>>>>>>>>>>>> transform expansion)
>>>>>>>>>>>>>>> - SplittableParDo uses two runner-specific primitive
>>>>>>>> transforms:
>>>>>>>>>>>>>>> "GBKIntoKeyedWorkItems" and "SplittableProcessElements".
>>>>>>>> Direct
>>>>>>>>>>> runner
>>>>>>>>>>>>>>> overrides the first one like this
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
>>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
>>>>>>>>> beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
>>>>>>>>>>>>>>> ,
>>>>>>>>>>>>>>> and directly implements evaluation of the second one like
>>>>>>> this
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
>>>>>>>>> 99024d3a1f/runners/direct-java/src/main/java/org/apache/
>>>>>>>>> beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
>>>>>>>>>>>>>>> ,
>>>>>>>>>>>>>>> using runner hooks introduced in this PR
>>>>>>>>>>>>>>> <https://github.com/apache/beam/pull/1824>. At the core
>>>>>> of
>>>>>>>> the
>>>>>>>>>>> hooks
>>>>>>>>>>>> is
>>>>>>>>>>>>>>> "ProcessFn" which is like a regular DoFn but has to be
>>>>>>>> prepared
>>>>>>>>> at
>>>>>>>>>>>>>>> runtime
>>>>>>>>>>>>>>> with some hooks (state, timers, and runner access to
>>>>>>>>>>>>>>> RestrictionTracker)
>>>>>>>>>>>>>>> before you invoke it. I added a convenience implementation
>>>>>>> of
>>>>>>>>> the
>>>>>>>>>>> hook
>>>>>>>>>>>>>>> mimicking behavior of UnboundedSource.
>>>>>>>>>>>>>>> - The relevant runner-agnostic tests are in
>>>>>>> SplittableDoFnTest
>>>>>>>>>>>>>>> <
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/cc28f0cb4c44169f933475ae29a325
>>>>>>>>> 99024d3a1f/sdks/java/core/src/test/java/org/apache/beam/sdk/
>>>>>>>>> transforms/SplittableDoFnTest.java
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> That's all it takes, really - the runner has to implement
>>>>>>>> these
>>>>>>>>>> two
>>>>>>>>>>>>>>> transforms. When I looked at Spark and Flink runners, it
>>>>>> was
>>>>>>>> not
>>>>>>>>>>> quite
>>>>>>>>>>>>>>> clear to me how to implement the GBKIntoKeyedWorkItems
>>>>>>>>> transform,
>>>>>>>>>>> e.g.
>>>>>>>>>>>>>>> Spark runner currently doesn't use KeyedWorkItem at all -
>>>>>>> but
>>>>>>>> it
>>>>>>>>>>> seems
>>>>>>>>>>>>>>> definitely possible.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Data Artisans GmbH | Stresemannstr. 121A | 10963 Berlin
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> i...@data-artisans.com
>>>>>>>>>>>>>> +49-(0)30-55599146 <+49%2030%2055599146> <+49%2030%2055599146>
>>>>>> <+49%2030%2055599146>
>>>>>>>> <+49%2030%2055599146> <+49%2030%2055599146>
>>>>>>>>>> <+49%2030%2055599146>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg - HRB 158244 B
>>>>>>>>>>>>>> Managing Directors: Kostas Tzoumas, Stephan Ewen
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbono...@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>>
>>>>
>>>
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com

-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to