Hello all,
The API has been updated for Python (See
https://github.com/apache/beam/pull/8430). Please, if you catch any
documentation that needs updating, flag to me or just propose the change : )

As for Java - we didn't end up determining whether it makes sense to update
the API as well. Thoughts from others?

In any case, I've filed https://jira.apache.org/jira/browse/BEAM-7250 to
track this for Java.

Best
-P.

On Mon, Apr 29, 2019 at 2:41 PM Lukasz Cwik <[email protected]> wrote:

> Pablo, all the SplittableDoFn stuff is marked as @Experimental so one is
> able to change it. There really is only one complicated one to change in
> Watch.java, the rest are quite straightforward.
>
> On Mon, Apr 29, 2019 at 2:23 PM Pablo Estrada <[email protected]> wrote:
>
>> Thanks all,
>>  @Luke - I imagine that would be an improvement to the API, but this may
>> be harder as this is already available to users, and there are those who
>> have implemented SDFs under the current API. Would it be possible to make a
>> backwards-compatible change to the API here?
>>
>> For the Python changes, I've proposed a pull request:
>> https://github.com/apache/beam/pull/8430 - it was smaller than I thought
>> : ) - All comments welcome please.
>>
>> +Boyuan Zhang <[email protected]> I am happy to wait for your
>> SyntheticSource PR to be merged and make the appropriate changes if you'd
>> like.
>> Best
>> -P.
>>
>> On Mon, Apr 29, 2019 at 8:23 AM Lukasz Cwik <[email protected]> wrote:
>>
>>> Would it make sense to also do this in the Java SDK?
>>>
>>> The would make the restriction provider also mirror the TimerSpec and
>>> StateSpec which use annotations similar to how its done in Python.
>>>
>>> On Mon, Apr 29, 2019 at 3:42 AM Robert Bradshaw <[email protected]>
>>> wrote:
>>>
>>>> +1 to introducing this Param for consistency (and making the
>>>> substitution more obvious), and I think SDF is still new/experimental
>>>> enough we can do this. I don't know if we need Spec in addition to
>>>> Param and Provider.
>>>>
>>>> On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>> >
>>>> >
>>>> >
>>>> > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <[email protected]>
>>>> wrote:
>>>> >>
>>>> >> Hi all,
>>>> >> Sorry about the wall of text.
>>>> >> So, first of all, I thought about this while reviewing a PR by
>>>> Boyuan with an example of an SDF[1]. This is very exciting btw : ).
>>>> >>
>>>> >> Anyway... I certainly have a limited view of the whole SDF effort,
>>>> but I think it's worth discussing this particular point about the API
>>>> before finalizing SDF and making it widely available. So here I go:
>>>> >>
>>>> >> The Python API for SDF asks users to provide a restriction provider
>>>> in their process function signature. More or less the following:
>>>> >>
>>>> >> class MyOwnLittleSDF(beam.DoFn):
>>>> >>   def process(self, element,
>>>> >>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>>>> >>     # My DoFn logic...
>>>> >>
>>>> >> This is all fine, but something that I found a little odd is that
>>>> the restriction provider gets replaced at runtime with a restriction
>>>> tracker:
>>>> >>
>>>> >> class MyOwnLittleSDF(beam.DoFn):
>>>> >>   def process(self, element,
>>>> >>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>>>> >>     # This assert succeeds : )
>>>> >>     assert not isinstance(restriction_tracker,
>>>> >>                           MyOwnLittleRestrictionProvider)
>>>> >>
>>>> >> After thinking a little bit about it, I realized that the default
>>>> argument simply allows us to inform the runner where to find the
>>>> restriction provider; but that the thing that we need at runtime is NOT the
>>>> restriction provider - but rather, the restriction tracker.
>>>> >>
>>>> >> A similar pattern occurs with state and timers, where the runner
>>>> needs to know the sort of state, the coder for the values in that state (or
>>>> the time domain for timers); but the runtime parameter is different[2]. For
>>>> state and timers (and window, timestamp, pane, etc.) we provide a pattern
>>>> where users give a default value that is clearly a placeholder:
>>>> beam.DoFn.TimerParam, or beam.DoFn.StateParam.
>>>> >
>>>> >
>>>> > This is the way (new) DoFn work for Python SDK. SDK (harness)
>>>> identifies meanings of different (potential) arguments to a DoFn using
>>>> pre-defined default values.
>>>> >
>>>> >>
>>>> >>
>>>> >> In this case, the API is fairly similar, but (at least in my
>>>> imagination), it is much more clear about how the DoFnParam will be
>>>> replaced with something else at runtime. A similar change could be done for
>>>> SDF:
>>>> >>
>>>> >> class MyOwnLittleSDF(beam.DoFn):
>>>> >>   MY_RESTRICTION = \
>>>> >>       RestrictionSpec(provider=MyOwnLittleRestrictionProvider())
>>>> >>
>>>> >>   def process(
>>>> >>       self, element,
>>>> >>
>>>>  restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)):
>>>> >>     # My DoFn logic..
>>>> >
>>>> >
>>>> >
>>>> > If I understood correctly, what you propose is similar to the
>>>> existing solution but we add a XXXParam parameter for consistency ?
>>>> > I think this is fine and should be a relatively small change. Main
>>>> point is, SDK should be able to find out the RestrictionProvider class to
>>>> utilize it's methods before passing elements to DoFn.process() method:
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241
>>>> >
>>>> >
>>>> >>
>>>> >>
>>>> >> Perhaps it is a good opportunity to consider this, since SDF is
>>>> still in progress.
>>>> >>
>>>> >> Some pros:
>>>> >> - Consistent with other parameters that we pass to DoFn methods
>>>> >> - A bit more clear about what will happen at runtime
>>>> >>
>>>> >> Some cons:
>>>> >> - SDF developers are "power users", and will have gone through the
>>>> SDF documentation. This point will be clear to them.
>>>> >> - This may create unnecessary work, and perhaps unintended
>>>> consequences.
>>>> >> - I bet there's more
>>>> >>
>>>> >> Thoughts?
>>>> >>
>>>> >> -P.
>>>> >>
>>>> >> [1] https://github.com/apache/beam/pull/8338
>>>> >> [2]
>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586
>>>> .
>>>> >>
>>>> >>
>>>> >>
>>>>
>>>

Reply via email to