Hello all, The API has been updated for Python (See https://github.com/apache/beam/pull/8430). Please, if you catch any documentation that needs updating, flag to me or just propose the change : )
As for Java - we didn't end up determining whether it makes sense to update the API as well. Thoughts from others? In any case, I've filed https://jira.apache.org/jira/browse/BEAM-7250 to track this for Java. Best -P. On Mon, Apr 29, 2019 at 2:41 PM Lukasz Cwik <[email protected]> wrote: > Pablo, all the SplittableDoFn stuff is marked as @Experimental so one is > able to change it. There really is only one complicated one to change in > Watch.java, the rest are quite straightforward. > > On Mon, Apr 29, 2019 at 2:23 PM Pablo Estrada <[email protected]> wrote: > >> Thanks all, >> @Luke - I imagine that would be an improvement to the API, but this may >> be harder as this is already available to users, and there are those who >> have implemented SDFs under the current API. Would it be possible to make a >> backwards-compatible change to the API here? >> >> For the Python changes, I've proposed a pull request: >> https://github.com/apache/beam/pull/8430 - it was smaller than I thought >> : ) - All comments welcome please. >> >> +Boyuan Zhang <[email protected]> I am happy to wait for your >> SyntheticSource PR to be merged and make the appropriate changes if you'd >> like. >> Best >> -P. >> >> On Mon, Apr 29, 2019 at 8:23 AM Lukasz Cwik <[email protected]> wrote: >> >>> Would it make sense to also do this in the Java SDK? >>> >>> The would make the restriction provider also mirror the TimerSpec and >>> StateSpec which use annotations similar to how its done in Python. >>> >>> On Mon, Apr 29, 2019 at 3:42 AM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> +1 to introducing this Param for consistency (and making the >>>> substitution more obvious), and I think SDF is still new/experimental >>>> enough we can do this. I don't know if we need Spec in addition to >>>> Param and Provider. >>>> >>>> On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> > >>>> > >>>> > >>>> > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <[email protected]> >>>> wrote: >>>> >> >>>> >> Hi all, >>>> >> Sorry about the wall of text. >>>> >> So, first of all, I thought about this while reviewing a PR by >>>> Boyuan with an example of an SDF[1]. This is very exciting btw : ). >>>> >> >>>> >> Anyway... I certainly have a limited view of the whole SDF effort, >>>> but I think it's worth discussing this particular point about the API >>>> before finalizing SDF and making it widely available. So here I go: >>>> >> >>>> >> The Python API for SDF asks users to provide a restriction provider >>>> in their process function signature. More or less the following: >>>> >> >>>> >> class MyOwnLittleSDF(beam.DoFn): >>>> >> def process(self, element, >>>> >> restriction_tracker=MyOwnLittleRestrictionProvider()): >>>> >> # My DoFn logic... >>>> >> >>>> >> This is all fine, but something that I found a little odd is that >>>> the restriction provider gets replaced at runtime with a restriction >>>> tracker: >>>> >> >>>> >> class MyOwnLittleSDF(beam.DoFn): >>>> >> def process(self, element, >>>> >> restriction_tracker=MyOwnLittleRestrictionProvider()): >>>> >> # This assert succeeds : ) >>>> >> assert not isinstance(restriction_tracker, >>>> >> MyOwnLittleRestrictionProvider) >>>> >> >>>> >> After thinking a little bit about it, I realized that the default >>>> argument simply allows us to inform the runner where to find the >>>> restriction provider; but that the thing that we need at runtime is NOT the >>>> restriction provider - but rather, the restriction tracker. >>>> >> >>>> >> A similar pattern occurs with state and timers, where the runner >>>> needs to know the sort of state, the coder for the values in that state (or >>>> the time domain for timers); but the runtime parameter is different[2]. For >>>> state and timers (and window, timestamp, pane, etc.) we provide a pattern >>>> where users give a default value that is clearly a placeholder: >>>> beam.DoFn.TimerParam, or beam.DoFn.StateParam. >>>> > >>>> > >>>> > This is the way (new) DoFn work for Python SDK. SDK (harness) >>>> identifies meanings of different (potential) arguments to a DoFn using >>>> pre-defined default values. >>>> > >>>> >> >>>> >> >>>> >> In this case, the API is fairly similar, but (at least in my >>>> imagination), it is much more clear about how the DoFnParam will be >>>> replaced with something else at runtime. A similar change could be done for >>>> SDF: >>>> >> >>>> >> class MyOwnLittleSDF(beam.DoFn): >>>> >> MY_RESTRICTION = \ >>>> >> RestrictionSpec(provider=MyOwnLittleRestrictionProvider()) >>>> >> >>>> >> def process( >>>> >> self, element, >>>> >> >>>> restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)): >>>> >> # My DoFn logic.. >>>> > >>>> > >>>> > >>>> > If I understood correctly, what you propose is similar to the >>>> existing solution but we add a XXXParam parameter for consistency ? >>>> > I think this is fine and should be a relatively small change. Main >>>> point is, SDK should be able to find out the RestrictionProvider class to >>>> utilize it's methods before passing elements to DoFn.process() method: >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241 >>>> > >>>> > >>>> >> >>>> >> >>>> >> Perhaps it is a good opportunity to consider this, since SDF is >>>> still in progress. >>>> >> >>>> >> Some pros: >>>> >> - Consistent with other parameters that we pass to DoFn methods >>>> >> - A bit more clear about what will happen at runtime >>>> >> >>>> >> Some cons: >>>> >> - SDF developers are "power users", and will have gone through the >>>> SDF documentation. This point will be clear to them. >>>> >> - This may create unnecessary work, and perhaps unintended >>>> consequences. >>>> >> - I bet there's more >>>> >> >>>> >> Thoughts? >>>> >> >>>> >> -P. >>>> >> >>>> >> [1] https://github.com/apache/beam/pull/8338 >>>> >> [2] >>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586 >>>> . >>>> >> >>>> >> >>>> >> >>>> >>>
