Hi all,
Heads up: https://github.com/apache/beam/pull/3360 reintroduces
ProcessContinuation and the associated JIRA
https://issues.apache.org/jira/browse/BEAM-2447 explains why and how to get
it to make sense without the issues described in this document.
Thanks.

On Sat, Apr 8, 2017 at 6:56 AM Aljoscha Krettek <[email protected]> wrote:

> +1
>
> I was too busy with traveling and preparations for Flink Forward but I
> wanted to retroactively confirm that these are good changes. :-)
> > On 7. Apr 2017, at 22:43, Jean-Baptiste Onofré <[email protected]> wrote:
> >
> > Hi Eugene,
> >
> > thanks for the update and nice example.
> >
> > I plan to start to refactor/experiment on some IOs.
> >
> > Regards
> > JB
> >
> > On 04/08/2017 02:44 AM, Eugene Kirpichov wrote:
> >> The changes are in.
> >>
> >> Also included is a handy change that allows one to skip implementing the
> >> NewTracker method if the restriction type implements HasDefaultTracker,
> >> leaving the only two required methods be ProcessElement and
> >> GetInitialRestriction.
> >>
> >> E.g. here's what a minimal SDF example looks like now - splittably
> pairing
> >> a string with every number in 0..100:
> >>
> >>  class CountFn extends DoFn<String, KV<String, Long>> {
> >>    @ProcessElement
> >>    public void process(ProcessContext c, OffsetRangeTracker tracker) {
> >>      for (long i = tracker.currentRestriction().getFrom();
> >> tracker.tryClaim(i); ++i) {
> >>        c.output(KV.of(c.element(), i));
> >>      }
> >>    }
> >>
> >>    @GetInitialRestriction
> >>    public OffsetRange getInitialRange(String element) { return new
> >> OffsetRange(0, 100); }
> >>  }
> >>
> >>
> >> On Thu, Apr 6, 2017 at 3:16 PM Eugene Kirpichov <[email protected]>
> >> wrote:
> >>
> >>> FWIW, here's a pull request implementing these changes:
> >>> https://github.com/apache/beam/pull/2455
> >>>
> >>> On Wed, Apr 5, 2017 at 4:55 PM Eugene Kirpichov <[email protected]>
> >>> wrote:
> >>>
> >>> Hey all,
> >>>
> >>> From the recent experience in continuing implementation of Splittable
> >>> DoFn, I would like to propose a few changes to its API. They get rid
> of a
> >>> bug, make parts of its semantics more well-defined and easier for a
> user to
> >>> get right, and reduce the assumptions about the runner implementation.
> >>>
> >>> In short:
> >>> - Add c.updateWatermark() and report watermark continuously via this
> >>> method.
> >>> - Make SDF.@ProcessElement return void, which is simpler for users
> though
> >>> it doesn't allow to resume after a specified time
> >>> - Declare that SDF.@ProcessElement must guarantee that after it
> returns,
> >>> the entire tracker.currentRestriction() was processed.
> >>> - Add a bool RestrictionTracker.done() method to enforce the bullet
> above.
> >>> - For resuming after specified time, use regular DoFn with state and
> >>> timers API.
> >>>
> >>> The only downside is the removal (from SDF) of ability to suspend the
> call
> >>> for a certain amount of time - the suggestion is that, if you need
> that,
> >>> you should use a regular DoFn and the timers API.
> >>>
> >>> Please see the full proposal in the following doc and comment there &
> vote
> >>> on this thread.
> >>>
> >>>
> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing
> >>>
> >>>
> >>> I am going to concurrently start prototyping some parts of this
> proposal,
> >>> because the current implementation is simply wrong and this proposal
> is the
> >>> only way to fix it that I can think of, but I will adjust my
> implementation
> >>> based on the discussion. I believe this proposal should not affect
> runner
> >>> authors - I can make all the necessary changes myself.
> >>>
> >>> Thanks!
> >>>
> >>>
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > [email protected]
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>
>

Reply via email to