+1
I was too busy with traveling and preparations for Flink Forward but I wanted
to retroactively confirm that these are good changes. :-)
> On 7. Apr 2017, at 22:43, Jean-Baptiste Onofré <[email protected]> wrote:
>
> Hi Eugene,
>
> thanks for the update and nice example.
>
> I plan to start to refactor/experiment on some IOs.
>
> Regards
> JB
>
> On 04/08/2017 02:44 AM, Eugene Kirpichov wrote:
>> The changes are in.
>>
>> Also included is a handy change that allows one to skip implementing the
>> NewTracker method if the restriction type implements HasDefaultTracker,
>> leaving the only two required methods be ProcessElement and
>> GetInitialRestriction.
>>
>> E.g. here's what a minimal SDF example looks like now - splittably pairing
>> a string with every number in 0..100:
>>
>> class CountFn extends DoFn<String, KV<String, Long>> {
>> @ProcessElement
>> public void process(ProcessContext c, OffsetRangeTracker tracker) {
>> for (long i = tracker.currentRestriction().getFrom();
>> tracker.tryClaim(i); ++i) {
>> c.output(KV.of(c.element(), i));
>> }
>> }
>>
>> @GetInitialRestriction
>> public OffsetRange getInitialRange(String element) { return new
>> OffsetRange(0, 100); }
>> }
>>
>>
>> On Thu, Apr 6, 2017 at 3:16 PM Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> FWIW, here's a pull request implementing these changes:
>>> https://github.com/apache/beam/pull/2455
>>>
>>> On Wed, Apr 5, 2017 at 4:55 PM Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>> Hey all,
>>>
>>> From the recent experience in continuing implementation of Splittable
>>> DoFn, I would like to propose a few changes to its API. They get rid of a
>>> bug, make parts of its semantics more well-defined and easier for a user to
>>> get right, and reduce the assumptions about the runner implementation.
>>>
>>> In short:
>>> - Add c.updateWatermark() and report watermark continuously via this
>>> method.
>>> - Make SDF.@ProcessElement return void, which is simpler for users though
>>> it doesn't allow to resume after a specified time
>>> - Declare that SDF.@ProcessElement must guarantee that after it returns,
>>> the entire tracker.currentRestriction() was processed.
>>> - Add a bool RestrictionTracker.done() method to enforce the bullet above.
>>> - For resuming after specified time, use regular DoFn with state and
>>> timers API.
>>>
>>> The only downside is the removal (from SDF) of ability to suspend the call
>>> for a certain amount of time - the suggestion is that, if you need that,
>>> you should use a regular DoFn and the timers API.
>>>
>>> Please see the full proposal in the following doc and comment there & vote
>>> on this thread.
>>>
>>> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit?usp=sharing
>>>
>>>
>>> I am going to concurrently start prototyping some parts of this proposal,
>>> because the current implementation is simply wrong and this proposal is the
>>> only way to fix it that I can think of, but I will adjust my implementation
>>> based on the discussion. I believe this proposal should not affect runner
>>> authors - I can make all the necessary changes myself.
>>>
>>> Thanks!
>>>
>>>
>>
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com