+1 to introducing this Param for consistency (and making the
substitution more obvious), and I think SDF is still new/experimental
enough we can do this. I don't know if we need Spec in addition to
Param and Provider.

On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath <[email protected]> wrote:
>
>
>
> On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <[email protected]> wrote:
>>
>> Hi all,
>> Sorry about the wall of text.
>> So, first of all, I thought about this while reviewing a PR by Boyuan with 
>> an example of an SDF[1]. This is very exciting btw : ).
>>
>> Anyway... I certainly have a limited view of the whole SDF effort, but I 
>> think it's worth discussing this particular point about the API before 
>> finalizing SDF and making it widely available. So here I go:
>>
>> The Python API for SDF asks users to provide a restriction provider in their 
>> process function signature. More or less the following:
>>
>> class MyOwnLittleSDF(beam.DoFn):
>>   def process(self, element,
>>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>>     # My DoFn logic...
>>
>> This is all fine, but something that I found a little odd is that the 
>> restriction provider gets replaced at runtime with a restriction tracker:
>>
>> class MyOwnLittleSDF(beam.DoFn):
>>   def process(self, element,
>>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>>     # This assert succeeds : )
>>     assert not isinstance(restriction_tracker,
>>                           MyOwnLittleRestrictionProvider)
>>
>> After thinking a little bit about it, I realized that the default argument 
>> simply allows us to inform the runner where to find the restriction 
>> provider; but that the thing that we need at runtime is NOT the restriction 
>> provider - but rather, the restriction tracker.
>>
>> A similar pattern occurs with state and timers, where the runner needs to 
>> know the sort of state, the coder for the values in that state (or the time 
>> domain for timers); but the runtime parameter is different[2]. For state and 
>> timers (and window, timestamp, pane, etc.) we provide a pattern where users 
>> give a default value that is clearly a placeholder: beam.DoFn.TimerParam, or 
>> beam.DoFn.StateParam.
>
>
> This is the way (new) DoFn work for Python SDK. SDK (harness) identifies 
> meanings of different (potential) arguments to a DoFn using pre-defined 
> default values.
>
>>
>>
>> In this case, the API is fairly similar, but (at least in my imagination), 
>> it is much more clear about how the DoFnParam will be replaced with 
>> something else at runtime. A similar change could be done for SDF:
>>
>> class MyOwnLittleSDF(beam.DoFn):
>>   MY_RESTRICTION = \
>>       RestrictionSpec(provider=MyOwnLittleRestrictionProvider())
>>
>>   def process(
>>       self, element,
>>       restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)):
>>     # My DoFn logic..
>
>
>
> If I understood correctly, what you propose is similar to the existing 
> solution but we add a XXXParam parameter for consistency ?
> I think this is fine and should be a relatively small change. Main point is, 
> SDK should be able to find out the RestrictionProvider class to utilize it's 
> methods before passing elements to DoFn.process() method: 
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241
>
>
>>
>>
>> Perhaps it is a good opportunity to consider this, since SDF is still in 
>> progress.
>>
>> Some pros:
>> - Consistent with other parameters that we pass to DoFn methods
>> - A bit more clear about what will happen at runtime
>>
>> Some cons:
>> - SDF developers are "power users", and will have gone through the SDF 
>> documentation. This point will be clear to them.
>> - This may create unnecessary work, and perhaps unintended consequences.
>> - I bet there's more
>>
>> Thoughts?
>>
>> -P.
>>
>> [1] https://github.com/apache/beam/pull/8338
>> [2] 
>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586
>>  .
>>
>>
>>

Reply via email to