On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <pabl...@google.com> wrote:
> Hi all, > Sorry about the wall of text. > So, first of all, I thought about this while reviewing a PR by Boyuan with > an example of an SDF[1]. This is very exciting btw : ). > > Anyway... I certainly have a limited view of the whole SDF effort, but I > think it's worth discussing this particular point about the API before > finalizing SDF and making it widely available. So here I go: > > The Python API for SDF asks users to provide a restriction provider in > their process function signature. More or less the following: > > class MyOwnLittleSDF(beam.DoFn): > def process(self, element, > restriction_tracker=MyOwnLittleRestrictionProvider()): > # My DoFn logic... > > This is all fine, but something that I found a little odd is that the > restriction provider gets replaced at runtime with a restriction tracker: > > class MyOwnLittleSDF(beam.DoFn): > def process(self, element, > restriction_tracker=MyOwnLittleRestrictionProvider()): > # This assert succeeds : ) > assert not isinstance(restriction_tracker, > MyOwnLittleRestrictionProvider) > > After thinking a little bit about it, I realized that the default argument > simply allows us to inform the runner where to find the restriction > provider; but that the thing that we need at runtime is NOT the restriction > provider - but rather, the restriction tracker. > > A similar pattern occurs with state and timers, where the runner needs to > know the sort of state, the coder for the values in that state (or the time > domain for timers); but the runtime parameter is different[2]. For state > and timers (and window, timestamp, pane, etc.) we provide a pattern where > users give a default value that is clearly a placeholder: > beam.DoFn.TimerParam, or beam.DoFn.StateParam. > This is the way (new) DoFn work for Python SDK. SDK (harness) identifies meanings of different (potential) arguments to a DoFn using pre-defined default values. > > In this case, the API is fairly similar, but (at least in my imagination), > it is much more clear about how the DoFnParam will be replaced with > something else at runtime. A similar change could be done for SDF: > > class MyOwnLittleSDF(beam.DoFn): > MY_RESTRICTION = \ > RestrictionSpec(provider=MyOwnLittleRestrictionProvider()) > > def process( > self, element, > restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)): > # My DoFn logic.. > If I understood correctly, what you propose is similar to the existing solution but we add a XXXParam parameter for consistency ? I think this is fine and should be a relatively small change. Main point is, SDK should be able to find out the RestrictionProvider class to utilize it's methods before passing elements to DoFn.process() method: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241 > > Perhaps it is a good opportunity to consider this, since SDF is still in > progress. > > Some pros: > - Consistent with other parameters that we pass to DoFn methods > - A bit more clear about what will happen at runtime > > Some cons: > - SDF developers are "power users", and will have gone through the SDF > documentation. This point will be clear to them. > - This may create unnecessary work, and perhaps unintended consequences. > - I bet there's more > > Thoughts? > > -P. > > [1] https://github.com/apache/beam/pull/8338 > [2] > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586 > . > > > >