On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <pabl...@google.com> wrote:

> Hi all,
> Sorry about the wall of text.
> So, first of all, I thought about this while reviewing a PR by Boyuan with
> an example of an SDF[1]. This is very exciting btw : ).
>
> Anyway... I certainly have a limited view of the whole SDF effort, but I
> think it's worth discussing this particular point about the API before
> finalizing SDF and making it widely available. So here I go:
>
> The Python API for SDF asks users to provide a restriction provider in
> their process function signature. More or less the following:
>
> class MyOwnLittleSDF(beam.DoFn):
>   def process(self, element,
>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>     # My DoFn logic...
>
> This is all fine, but something that I found a little odd is that the
> restriction provider gets replaced at runtime with a restriction tracker:
>
> class MyOwnLittleSDF(beam.DoFn):
>   def process(self, element,
>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>     # This assert succeeds : )
>     assert not isinstance(restriction_tracker,
>                           MyOwnLittleRestrictionProvider)
>
> After thinking a little bit about it, I realized that the default argument
> simply allows us to inform the runner where to find the restriction
> provider; but that the thing that we need at runtime is NOT the restriction
> provider - but rather, the restriction tracker.
>
> A similar pattern occurs with state and timers, where the runner needs to
> know the sort of state, the coder for the values in that state (or the time
> domain for timers); but the runtime parameter is different[2]. For state
> and timers (and window, timestamp, pane, etc.) we provide a pattern where
> users give a default value that is clearly a placeholder:
> beam.DoFn.TimerParam, or beam.DoFn.StateParam.
>

This is the way (new) DoFn work for Python SDK. SDK (harness) identifies
meanings of different (potential) arguments to a DoFn using pre-defined
default values.


>
> In this case, the API is fairly similar, but (at least in my imagination),
> it is much more clear about how the DoFnParam will be replaced with
> something else at runtime. A similar change could be done for SDF:
>
> class MyOwnLittleSDF(beam.DoFn):
>   MY_RESTRICTION = \
>       RestrictionSpec(provider=MyOwnLittleRestrictionProvider())
>
>   def process(
>       self, element,
>       restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)):
>     # My DoFn logic..
>


If I understood correctly, what you propose is similar to the existing
solution but we add a XXXParam parameter for consistency ?
I think this is fine and should be a relatively small change. Main point
is, SDK should be able to find out the RestrictionProvider class to utilize
it's methods before passing elements to DoFn.process() method:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241



>
> Perhaps it is a good opportunity to consider this, since SDF is still in
> progress.
>
> Some pros:
> - Consistent with other parameters that we pass to DoFn methods
> - A bit more clear about what will happen at runtime
>
> Some cons:
> - SDF developers are "power users", and will have gone through the SDF
> documentation. This point will be clear to them.
> - This may create unnecessary work, and perhaps unintended consequences.
> - I bet there's more
>
> Thoughts?
>
> -P.
>
> [1] https://github.com/apache/beam/pull/8338
> [2]
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586
>  .
>
>
>
>

Reply via email to