+1 for the change for Java too both for consistency with Python and with the way State/Timers work too.
It would be really nice if possible to see a concrete proposed example (or even better design doc). Thanks for bringing this idea Pablo and sorry for delayed answer. On Wed, Jun 5, 2019 at 8:44 PM Pablo Estrada <[email protected]> wrote: > > I have no objections. > > +Ismaël Mejía who has familiarity and interest in Java SDF. > > On Wed, Jun 5, 2019 at 11:31 AM Brian Hulette <[email protected]> wrote: >> >> Just wanted to resurrect this to say that it seems appropriate to make the >> same change in Java. All the same arguments apply there, and now there's the >> additional argument for maintaining symmetry with Python. >> >> I think BEAM-7250 should be changed to a ticket to actually implement this >> in Java unless someone has an objection. >> >> Brian >> >> On Wed, May 8, 2019 at 2:20 PM Pablo Estrada <[email protected]> wrote: >>> >>> Hello all, >>> The API has been updated for Python (See >>> https://github.com/apache/beam/pull/8430). Please, if you catch any >>> documentation that needs updating, flag to me or just propose the change : ) >>> >>> As for Java - we didn't end up determining whether it makes sense to update >>> the API as well. Thoughts from others? >>> >>> In any case, I've filed https://jira.apache.org/jira/browse/BEAM-7250 to >>> track this for Java. >>> >>> Best >>> -P. >>> >>> On Mon, Apr 29, 2019 at 2:41 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>> Pablo, all the SplittableDoFn stuff is marked as @Experimental so one is >>>> able to change it. There really is only one complicated one to change in >>>> Watch.java, the rest are quite straightforward. >>>> >>>> On Mon, Apr 29, 2019 at 2:23 PM Pablo Estrada <[email protected]> wrote: >>>>> >>>>> Thanks all, >>>>> @Luke - I imagine that would be an improvement to the API, but this may >>>>> be harder as this is already available to users, and there are those who >>>>> have implemented SDFs under the current API. Would it be possible to make >>>>> a backwards-compatible change to the API here? >>>>> >>>>> For the Python changes, I've proposed a pull request: >>>>> https://github.com/apache/beam/pull/8430 - it was smaller than I thought >>>>> : ) - All comments welcome please. >>>>> >>>>> +Boyuan Zhang I am happy to wait for your SyntheticSource PR to be merged >>>>> and make the appropriate changes if you'd like. >>>>> Best >>>>> -P. >>>>> >>>>> On Mon, Apr 29, 2019 at 8:23 AM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>> Would it make sense to also do this in the Java SDK? >>>>>> >>>>>> The would make the restriction provider also mirror the TimerSpec and >>>>>> StateSpec which use annotations similar to how its done in Python. >>>>>> >>>>>> On Mon, Apr 29, 2019 at 3:42 AM Robert Bradshaw <[email protected]> >>>>>> wrote: >>>>>>> >>>>>>> +1 to introducing this Param for consistency (and making the >>>>>>> substitution more obvious), and I think SDF is still new/experimental >>>>>>> enough we can do this. I don't know if we need Spec in addition to >>>>>>> Param and Provider. >>>>>>> >>>>>>> On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath >>>>>>> <[email protected]> wrote: >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <[email protected]> >>>>>>> > wrote: >>>>>>> >> >>>>>>> >> Hi all, >>>>>>> >> Sorry about the wall of text. >>>>>>> >> So, first of all, I thought about this while reviewing a PR by >>>>>>> >> Boyuan with an example of an SDF[1]. This is very exciting btw : ). >>>>>>> >> >>>>>>> >> Anyway... I certainly have a limited view of the whole SDF effort, >>>>>>> >> but I think it's worth discussing this particular point about the >>>>>>> >> API before finalizing SDF and making it widely available. So here I >>>>>>> >> go: >>>>>>> >> >>>>>>> >> The Python API for SDF asks users to provide a restriction provider >>>>>>> >> in their process function signature. More or less the following: >>>>>>> >> >>>>>>> >> class MyOwnLittleSDF(beam.DoFn): >>>>>>> >> def process(self, element, >>>>>>> >> restriction_tracker=MyOwnLittleRestrictionProvider()): >>>>>>> >> # My DoFn logic... >>>>>>> >> >>>>>>> >> This is all fine, but something that I found a little odd is that >>>>>>> >> the restriction provider gets replaced at runtime with a restriction >>>>>>> >> tracker: >>>>>>> >> >>>>>>> >> class MyOwnLittleSDF(beam.DoFn): >>>>>>> >> def process(self, element, >>>>>>> >> restriction_tracker=MyOwnLittleRestrictionProvider()): >>>>>>> >> # This assert succeeds : ) >>>>>>> >> assert not isinstance(restriction_tracker, >>>>>>> >> MyOwnLittleRestrictionProvider) >>>>>>> >> >>>>>>> >> After thinking a little bit about it, I realized that the default >>>>>>> >> argument simply allows us to inform the runner where to find the >>>>>>> >> restriction provider; but that the thing that we need at runtime is >>>>>>> >> NOT the restriction provider - but rather, the restriction tracker. >>>>>>> >> >>>>>>> >> A similar pattern occurs with state and timers, where the runner >>>>>>> >> needs to know the sort of state, the coder for the values in that >>>>>>> >> state (or the time domain for timers); but the runtime parameter is >>>>>>> >> different[2]. For state and timers (and window, timestamp, pane, >>>>>>> >> etc.) we provide a pattern where users give a default value that is >>>>>>> >> clearly a placeholder: beam.DoFn.TimerParam, or beam.DoFn.StateParam. >>>>>>> > >>>>>>> > >>>>>>> > This is the way (new) DoFn work for Python SDK. SDK (harness) >>>>>>> > identifies meanings of different (potential) arguments to a DoFn >>>>>>> > using pre-defined default values. >>>>>>> > >>>>>>> >> >>>>>>> >> >>>>>>> >> In this case, the API is fairly similar, but (at least in my >>>>>>> >> imagination), it is much more clear about how the DoFnParam will be >>>>>>> >> replaced with something else at runtime. A similar change could be >>>>>>> >> done for SDF: >>>>>>> >> >>>>>>> >> class MyOwnLittleSDF(beam.DoFn): >>>>>>> >> MY_RESTRICTION = \ >>>>>>> >> RestrictionSpec(provider=MyOwnLittleRestrictionProvider()) >>>>>>> >> >>>>>>> >> def process( >>>>>>> >> self, element, >>>>>>> >> >>>>>>> >> restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)): >>>>>>> >> # My DoFn logic.. >>>>>>> > >>>>>>> > >>>>>>> > >>>>>>> > If I understood correctly, what you propose is similar to the >>>>>>> > existing solution but we add a XXXParam parameter for consistency ? >>>>>>> > I think this is fine and should be a relatively small change. Main >>>>>>> > point is, SDK should be able to find out the RestrictionProvider >>>>>>> > class to utilize it's methods before passing elements to >>>>>>> > DoFn.process() method: >>>>>>> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241 >>>>>>> > >>>>>>> > >>>>>>> >> >>>>>>> >> >>>>>>> >> Perhaps it is a good opportunity to consider this, since SDF is >>>>>>> >> still in progress. >>>>>>> >> >>>>>>> >> Some pros: >>>>>>> >> - Consistent with other parameters that we pass to DoFn methods >>>>>>> >> - A bit more clear about what will happen at runtime >>>>>>> >> >>>>>>> >> Some cons: >>>>>>> >> - SDF developers are "power users", and will have gone through the >>>>>>> >> SDF documentation. This point will be clear to them. >>>>>>> >> - This may create unnecessary work, and perhaps unintended >>>>>>> >> consequences. >>>>>>> >> - I bet there's more >>>>>>> >> >>>>>>> >> Thoughts? >>>>>>> >> >>>>>>> >> -P. >>>>>>> >> >>>>>>> >> [1] https://github.com/apache/beam/pull/8338 >>>>>>> >> [2] >>>>>>> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586 >>>>>>> >> . >>>>>>> >> >>>>>>> >> >>>>>>> >>
