Pablo, all the SplittableDoFn stuff is marked as @Experimental so one is able to change it. There really is only one complicated one to change in Watch.java, the rest are quite straightforward.
On Mon, Apr 29, 2019 at 2:23 PM Pablo Estrada <pabl...@google.com> wrote: > Thanks all, > @Luke - I imagine that would be an improvement to the API, but this may > be harder as this is already available to users, and there are those who > have implemented SDFs under the current API. Would it be possible to make a > backwards-compatible change to the API here? > > For the Python changes, I've proposed a pull request: > https://github.com/apache/beam/pull/8430 - it was smaller than I thought > : ) - All comments welcome please. > > +Boyuan Zhang <boyu...@google.com> I am happy to wait for your > SyntheticSource PR to be merged and make the appropriate changes if you'd > like. > Best > -P. > > On Mon, Apr 29, 2019 at 8:23 AM Lukasz Cwik <lc...@google.com> wrote: > >> Would it make sense to also do this in the Java SDK? >> >> The would make the restriction provider also mirror the TimerSpec and >> StateSpec which use annotations similar to how its done in Python. >> >> On Mon, Apr 29, 2019 at 3:42 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> +1 to introducing this Param for consistency (and making the >>> substitution more obvious), and I think SDF is still new/experimental >>> enough we can do this. I don't know if we need Spec in addition to >>> Param and Provider. >>> >>> On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> > >>> > >>> > >>> > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <pabl...@google.com> >>> wrote: >>> >> >>> >> Hi all, >>> >> Sorry about the wall of text. >>> >> So, first of all, I thought about this while reviewing a PR by Boyuan >>> with an example of an SDF[1]. This is very exciting btw : ). >>> >> >>> >> Anyway... I certainly have a limited view of the whole SDF effort, >>> but I think it's worth discussing this particular point about the API >>> before finalizing SDF and making it widely available. So here I go: >>> >> >>> >> The Python API for SDF asks users to provide a restriction provider >>> in their process function signature. More or less the following: >>> >> >>> >> class MyOwnLittleSDF(beam.DoFn): >>> >> def process(self, element, >>> >> restriction_tracker=MyOwnLittleRestrictionProvider()): >>> >> # My DoFn logic... >>> >> >>> >> This is all fine, but something that I found a little odd is that the >>> restriction provider gets replaced at runtime with a restriction tracker: >>> >> >>> >> class MyOwnLittleSDF(beam.DoFn): >>> >> def process(self, element, >>> >> restriction_tracker=MyOwnLittleRestrictionProvider()): >>> >> # This assert succeeds : ) >>> >> assert not isinstance(restriction_tracker, >>> >> MyOwnLittleRestrictionProvider) >>> >> >>> >> After thinking a little bit about it, I realized that the default >>> argument simply allows us to inform the runner where to find the >>> restriction provider; but that the thing that we need at runtime is NOT the >>> restriction provider - but rather, the restriction tracker. >>> >> >>> >> A similar pattern occurs with state and timers, where the runner >>> needs to know the sort of state, the coder for the values in that state (or >>> the time domain for timers); but the runtime parameter is different[2]. For >>> state and timers (and window, timestamp, pane, etc.) we provide a pattern >>> where users give a default value that is clearly a placeholder: >>> beam.DoFn.TimerParam, or beam.DoFn.StateParam. >>> > >>> > >>> > This is the way (new) DoFn work for Python SDK. SDK (harness) >>> identifies meanings of different (potential) arguments to a DoFn using >>> pre-defined default values. >>> > >>> >> >>> >> >>> >> In this case, the API is fairly similar, but (at least in my >>> imagination), it is much more clear about how the DoFnParam will be >>> replaced with something else at runtime. A similar change could be done for >>> SDF: >>> >> >>> >> class MyOwnLittleSDF(beam.DoFn): >>> >> MY_RESTRICTION = \ >>> >> RestrictionSpec(provider=MyOwnLittleRestrictionProvider()) >>> >> >>> >> def process( >>> >> self, element, >>> >> restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)): >>> >> # My DoFn logic.. >>> > >>> > >>> > >>> > If I understood correctly, what you propose is similar to the existing >>> solution but we add a XXXParam parameter for consistency ? >>> > I think this is fine and should be a relatively small change. Main >>> point is, SDK should be able to find out the RestrictionProvider class to >>> utilize it's methods before passing elements to DoFn.process() method: >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241 >>> > >>> > >>> >> >>> >> >>> >> Perhaps it is a good opportunity to consider this, since SDF is still >>> in progress. >>> >> >>> >> Some pros: >>> >> - Consistent with other parameters that we pass to DoFn methods >>> >> - A bit more clear about what will happen at runtime >>> >> >>> >> Some cons: >>> >> - SDF developers are "power users", and will have gone through the >>> SDF documentation. This point will be clear to them. >>> >> - This may create unnecessary work, and perhaps unintended >>> consequences. >>> >> - I bet there's more >>> >> >>> >> Thoughts? >>> >> >>> >> -P. >>> >> >>> >> [1] https://github.com/apache/beam/pull/8338 >>> >> [2] >>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586 >>> . >>> >> >>> >> >>> >> >>> >>