Thanks all,
 @Luke - I imagine that would be an improvement to the API, but this may be
harder as this is already available to users, and there are those who have
implemented SDFs under the current API. Would it be possible to make a
backwards-compatible change to the API here?

For the Python changes, I've proposed a pull request:
https://github.com/apache/beam/pull/8430 - it was smaller than I thought :
) - All comments welcome please.

+Boyuan Zhang <boyu...@google.com> I am happy to wait for your
SyntheticSource PR to be merged and make the appropriate changes if you'd
like.
Best
-P.

On Mon, Apr 29, 2019 at 8:23 AM Lukasz Cwik <lc...@google.com> wrote:

> Would it make sense to also do this in the Java SDK?
>
> The would make the restriction provider also mirror the TimerSpec and
> StateSpec which use annotations similar to how its done in Python.
>
> On Mon, Apr 29, 2019 at 3:42 AM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> +1 to introducing this Param for consistency (and making the
>> substitution more obvious), and I think SDF is still new/experimental
>> enough we can do this. I don't know if we need Spec in addition to
>> Param and Provider.
>>
>> On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>> >
>> >
>> >
>> > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <pabl...@google.com>
>> wrote:
>> >>
>> >> Hi all,
>> >> Sorry about the wall of text.
>> >> So, first of all, I thought about this while reviewing a PR by Boyuan
>> with an example of an SDF[1]. This is very exciting btw : ).
>> >>
>> >> Anyway... I certainly have a limited view of the whole SDF effort, but
>> I think it's worth discussing this particular point about the API before
>> finalizing SDF and making it widely available. So here I go:
>> >>
>> >> The Python API for SDF asks users to provide a restriction provider in
>> their process function signature. More or less the following:
>> >>
>> >> class MyOwnLittleSDF(beam.DoFn):
>> >>   def process(self, element,
>> >>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>> >>     # My DoFn logic...
>> >>
>> >> This is all fine, but something that I found a little odd is that the
>> restriction provider gets replaced at runtime with a restriction tracker:
>> >>
>> >> class MyOwnLittleSDF(beam.DoFn):
>> >>   def process(self, element,
>> >>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>> >>     # This assert succeeds : )
>> >>     assert not isinstance(restriction_tracker,
>> >>                           MyOwnLittleRestrictionProvider)
>> >>
>> >> After thinking a little bit about it, I realized that the default
>> argument simply allows us to inform the runner where to find the
>> restriction provider; but that the thing that we need at runtime is NOT the
>> restriction provider - but rather, the restriction tracker.
>> >>
>> >> A similar pattern occurs with state and timers, where the runner needs
>> to know the sort of state, the coder for the values in that state (or the
>> time domain for timers); but the runtime parameter is different[2]. For
>> state and timers (and window, timestamp, pane, etc.) we provide a pattern
>> where users give a default value that is clearly a placeholder:
>> beam.DoFn.TimerParam, or beam.DoFn.StateParam.
>> >
>> >
>> > This is the way (new) DoFn work for Python SDK. SDK (harness)
>> identifies meanings of different (potential) arguments to a DoFn using
>> pre-defined default values.
>> >
>> >>
>> >>
>> >> In this case, the API is fairly similar, but (at least in my
>> imagination), it is much more clear about how the DoFnParam will be
>> replaced with something else at runtime. A similar change could be done for
>> SDF:
>> >>
>> >> class MyOwnLittleSDF(beam.DoFn):
>> >>   MY_RESTRICTION = \
>> >>       RestrictionSpec(provider=MyOwnLittleRestrictionProvider())
>> >>
>> >>   def process(
>> >>       self, element,
>> >>       restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)):
>> >>     # My DoFn logic..
>> >
>> >
>> >
>> > If I understood correctly, what you propose is similar to the existing
>> solution but we add a XXXParam parameter for consistency ?
>> > I think this is fine and should be a relatively small change. Main
>> point is, SDK should be able to find out the RestrictionProvider class to
>> utilize it's methods before passing elements to DoFn.process() method:
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241
>> >
>> >
>> >>
>> >>
>> >> Perhaps it is a good opportunity to consider this, since SDF is still
>> in progress.
>> >>
>> >> Some pros:
>> >> - Consistent with other parameters that we pass to DoFn methods
>> >> - A bit more clear about what will happen at runtime
>> >>
>> >> Some cons:
>> >> - SDF developers are "power users", and will have gone through the SDF
>> documentation. This point will be clear to them.
>> >> - This may create unnecessary work, and perhaps unintended
>> consequences.
>> >> - I bet there's more
>> >>
>> >> Thoughts?
>> >>
>> >> -P.
>> >>
>> >> [1] https://github.com/apache/beam/pull/8338
>> >> [2]
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586
>> .
>> >>
>> >>
>> >>
>>
>

Reply via email to