Hi Eugene,

thanks for the update and nice example.

I plan to start to refactor/experiment on some IOs.

Regards
JB

On 04/08/2017 02:44 AM, Eugene Kirpichov wrote:
The changes are in.

Also included is a handy change that allows one to skip implementing the
NewTracker method if the restriction type implements HasDefaultTracker,
leaving the only two required methods be ProcessElement and
GetInitialRestriction.

E.g. here's what a minimal SDF example looks like now - splittably pairing
a string with every number in 0..100:

  class CountFn extends DoFn<String, KV<String, Long>> {
    @ProcessElement
    public void process(ProcessContext c, OffsetRangeTracker tracker) {
      for (long i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) {
        c.output(KV.of(c.element(), i));
      }
    }

    @GetInitialRestriction
    public OffsetRange getInitialRange(String element) { return new
OffsetRange(0, 100); }
  }


On Thu, Apr 6, 2017 at 3:16 PM Eugene Kirpichov <[email protected]>
wrote:

FWIW, here's a pull request implementing these changes:
https://github.com/apache/beam/pull/2455

On Wed, Apr 5, 2017 at 4:55 PM Eugene Kirpichov <[email protected]>
wrote:

Hey all,

From the recent experience in continuing implementation of Splittable
DoFn, I would like to propose a few changes to its API. They get rid of a
bug, make parts of its semantics more well-defined and easier for a user to
get right, and reduce the assumptions about the runner implementation.

In short:
- Add c.updateWatermark() and report watermark continuously via this
method.
- Make SDF.@ProcessElement return void, which is simpler for users though
it doesn't allow to resume after a specified time
- Declare that SDF.@ProcessElement must guarantee that after it returns,
the entire tracker.currentRestriction() was processed.
- Add a bool RestrictionTracker.done() method to enforce the bullet above.
- For resuming after specified time, use regular DoFn with state and
timers API.

The only downside is the removal (from SDF) of ability to suspend the call
for a certain amount of time - the suggestion is that, if you need that,
you should use a regular DoFn and the timers API.

Please see the full proposal in the following doc and comment there & vote
on this thread.

https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing


I am going to concurrently start prototyping some parts of this proposal,
because the current implementation is simply wrong and this proposal is the
only way to fix it that I can think of, but I will adjust my implementation
based on the discussion. I believe this proposal should not affect runner
authors - I can make all the necessary changes myself.

Thanks!




--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to