Hi all, Heads up: https://github.com/apache/beam/pull/3360 reintroduces ProcessContinuation and the associated JIRA https://issues.apache.org/jira/browse/BEAM-2447 explains why and how to get it to make sense without the issues described in this document. Thanks.
On Sat, Apr 8, 2017 at 6:56 AM Aljoscha Krettek <[email protected]> wrote: > +1 > > I was too busy with traveling and preparations for Flink Forward but I > wanted to retroactively confirm that these are good changes. :-) > > On 7. Apr 2017, at 22:43, Jean-Baptiste Onofré <[email protected]> wrote: > > > > Hi Eugene, > > > > thanks for the update and nice example. > > > > I plan to start to refactor/experiment on some IOs. > > > > Regards > > JB > > > > On 04/08/2017 02:44 AM, Eugene Kirpichov wrote: > >> The changes are in. > >> > >> Also included is a handy change that allows one to skip implementing the > >> NewTracker method if the restriction type implements HasDefaultTracker, > >> leaving the only two required methods be ProcessElement and > >> GetInitialRestriction. > >> > >> E.g. here's what a minimal SDF example looks like now - splittably > pairing > >> a string with every number in 0..100: > >> > >> class CountFn extends DoFn<String, KV<String, Long>> { > >> @ProcessElement > >> public void process(ProcessContext c, OffsetRangeTracker tracker) { > >> for (long i = tracker.currentRestriction().getFrom(); > >> tracker.tryClaim(i); ++i) { > >> c.output(KV.of(c.element(), i)); > >> } > >> } > >> > >> @GetInitialRestriction > >> public OffsetRange getInitialRange(String element) { return new > >> OffsetRange(0, 100); } > >> } > >> > >> > >> On Thu, Apr 6, 2017 at 3:16 PM Eugene Kirpichov <[email protected]> > >> wrote: > >> > >>> FWIW, here's a pull request implementing these changes: > >>> https://github.com/apache/beam/pull/2455 > >>> > >>> On Wed, Apr 5, 2017 at 4:55 PM Eugene Kirpichov <[email protected]> > >>> wrote: > >>> > >>> Hey all, > >>> > >>> From the recent experience in continuing implementation of Splittable > >>> DoFn, I would like to propose a few changes to its API. They get rid > of a > >>> bug, make parts of its semantics more well-defined and easier for a > user to > >>> get right, and reduce the assumptions about the runner implementation. > >>> > >>> In short: > >>> - Add c.updateWatermark() and report watermark continuously via this > >>> method. > >>> - Make SDF.@ProcessElement return void, which is simpler for users > though > >>> it doesn't allow to resume after a specified time > >>> - Declare that SDF.@ProcessElement must guarantee that after it > returns, > >>> the entire tracker.currentRestriction() was processed. > >>> - Add a bool RestrictionTracker.done() method to enforce the bullet > above. > >>> - For resuming after specified time, use regular DoFn with state and > >>> timers API. > >>> > >>> The only downside is the removal (from SDF) of ability to suspend the > call > >>> for a certain amount of time - the suggestion is that, if you need > that, > >>> you should use a regular DoFn and the timers API. > >>> > >>> Please see the full proposal in the following doc and comment there & > vote > >>> on this thread. > >>> > >>> > https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing > >>> > >>> > >>> I am going to concurrently start prototyping some parts of this > proposal, > >>> because the current implementation is simply wrong and this proposal > is the > >>> only way to fix it that I can think of, but I will adjust my > implementation > >>> based on the discussion. I believe this proposal should not affect > runner > >>> authors - I can make all the necessary changes myself. > >>> > >>> Thanks! > >>> > >>> > >> > > > > -- > > Jean-Baptiste Onofré > > [email protected] > > http://blog.nanthrax.net > > Talend - http://www.talend.com > >
