+1 to introducing this Param for consistency (and making the substitution more obvious), and I think SDF is still new/experimental enough we can do this. I don't know if we need Spec in addition to Param and Provider.
On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath <[email protected]> wrote: > > > > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <[email protected]> wrote: >> >> Hi all, >> Sorry about the wall of text. >> So, first of all, I thought about this while reviewing a PR by Boyuan with >> an example of an SDF[1]. This is very exciting btw : ). >> >> Anyway... I certainly have a limited view of the whole SDF effort, but I >> think it's worth discussing this particular point about the API before >> finalizing SDF and making it widely available. So here I go: >> >> The Python API for SDF asks users to provide a restriction provider in their >> process function signature. More or less the following: >> >> class MyOwnLittleSDF(beam.DoFn): >> def process(self, element, >> restriction_tracker=MyOwnLittleRestrictionProvider()): >> # My DoFn logic... >> >> This is all fine, but something that I found a little odd is that the >> restriction provider gets replaced at runtime with a restriction tracker: >> >> class MyOwnLittleSDF(beam.DoFn): >> def process(self, element, >> restriction_tracker=MyOwnLittleRestrictionProvider()): >> # This assert succeeds : ) >> assert not isinstance(restriction_tracker, >> MyOwnLittleRestrictionProvider) >> >> After thinking a little bit about it, I realized that the default argument >> simply allows us to inform the runner where to find the restriction >> provider; but that the thing that we need at runtime is NOT the restriction >> provider - but rather, the restriction tracker. >> >> A similar pattern occurs with state and timers, where the runner needs to >> know the sort of state, the coder for the values in that state (or the time >> domain for timers); but the runtime parameter is different[2]. For state and >> timers (and window, timestamp, pane, etc.) we provide a pattern where users >> give a default value that is clearly a placeholder: beam.DoFn.TimerParam, or >> beam.DoFn.StateParam. > > > This is the way (new) DoFn work for Python SDK. SDK (harness) identifies > meanings of different (potential) arguments to a DoFn using pre-defined > default values. > >> >> >> In this case, the API is fairly similar, but (at least in my imagination), >> it is much more clear about how the DoFnParam will be replaced with >> something else at runtime. A similar change could be done for SDF: >> >> class MyOwnLittleSDF(beam.DoFn): >> MY_RESTRICTION = \ >> RestrictionSpec(provider=MyOwnLittleRestrictionProvider()) >> >> def process( >> self, element, >> restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)): >> # My DoFn logic.. > > > > If I understood correctly, what you propose is similar to the existing > solution but we add a XXXParam parameter for consistency ? > I think this is fine and should be a relatively small change. Main point is, > SDK should be able to find out the RestrictionProvider class to utilize it's > methods before passing elements to DoFn.process() method: > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241 > > >> >> >> Perhaps it is a good opportunity to consider this, since SDF is still in >> progress. >> >> Some pros: >> - Consistent with other parameters that we pass to DoFn methods >> - A bit more clear about what will happen at runtime >> >> Some cons: >> - SDF developers are "power users", and will have gone through the SDF >> documentation. This point will be clear to them. >> - This may create unnecessary work, and perhaps unintended consequences. >> - I bet there's more >> >> Thoughts? >> >> -P. >> >> [1] https://github.com/apache/beam/pull/8338 >> [2] >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586 >> . >> >> >>
