I have no objections. +Ismaël Mejía <ieme...@gmail.com> who has familiarity and interest in Java SDF.
On Wed, Jun 5, 2019 at 11:31 AM Brian Hulette <bhule...@google.com> wrote: > Just wanted to resurrect this to say that it seems appropriate to make the > same change in Java. All the same arguments apply there, and now there's > the additional argument for maintaining symmetry with Python. > > I think BEAM-7250 should be changed to a ticket to actually implement this > in Java unless someone has an objection. > > Brian > > On Wed, May 8, 2019 at 2:20 PM Pablo Estrada <pabl...@google.com> wrote: > >> Hello all, >> The API has been updated for Python (See >> https://github.com/apache/beam/pull/8430). Please, if you catch any >> documentation that needs updating, flag to me or just propose the change : ) >> >> As for Java - we didn't end up determining whether it makes sense to >> update the API as well. Thoughts from others? >> >> In any case, I've filed https://jira.apache.org/jira/browse/BEAM-7250 to >> track this for Java. >> >> Best >> -P. >> >> On Mon, Apr 29, 2019 at 2:41 PM Lukasz Cwik <lc...@google.com> wrote: >> >>> Pablo, all the SplittableDoFn stuff is marked as @Experimental so one is >>> able to change it. There really is only one complicated one to change in >>> Watch.java, the rest are quite straightforward. >>> >>> On Mon, Apr 29, 2019 at 2:23 PM Pablo Estrada <pabl...@google.com> >>> wrote: >>> >>>> Thanks all, >>>> @Luke - I imagine that would be an improvement to the API, but this >>>> may be harder as this is already available to users, and there are those >>>> who have implemented SDFs under the current API. Would it be possible to >>>> make a backwards-compatible change to the API here? >>>> >>>> For the Python changes, I've proposed a pull request: >>>> https://github.com/apache/beam/pull/8430 - it was smaller than I >>>> thought : ) - All comments welcome please. >>>> >>>> +Boyuan Zhang <boyu...@google.com> I am happy to wait for your >>>> SyntheticSource PR to be merged and make the appropriate changes if you'd >>>> like. >>>> Best >>>> -P. >>>> >>>> On Mon, Apr 29, 2019 at 8:23 AM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> Would it make sense to also do this in the Java SDK? >>>>> >>>>> The would make the restriction provider also mirror the TimerSpec and >>>>> StateSpec which use annotations similar to how its done in Python. >>>>> >>>>> On Mon, Apr 29, 2019 at 3:42 AM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> +1 to introducing this Param for consistency (and making the >>>>>> substitution more obvious), and I think SDF is still new/experimental >>>>>> enough we can do this. I don't know if we need Spec in addition to >>>>>> Param and Provider. >>>>>> >>>>>> On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath < >>>>>> chamik...@google.com> wrote: >>>>>> > >>>>>> > >>>>>> > >>>>>> > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <pabl...@google.com> >>>>>> wrote: >>>>>> >> >>>>>> >> Hi all, >>>>>> >> Sorry about the wall of text. >>>>>> >> So, first of all, I thought about this while reviewing a PR by >>>>>> Boyuan with an example of an SDF[1]. This is very exciting btw : ). >>>>>> >> >>>>>> >> Anyway... I certainly have a limited view of the whole SDF effort, >>>>>> but I think it's worth discussing this particular point about the API >>>>>> before finalizing SDF and making it widely available. So here I go: >>>>>> >> >>>>>> >> The Python API for SDF asks users to provide a restriction >>>>>> provider in their process function signature. More or less the following: >>>>>> >> >>>>>> >> class MyOwnLittleSDF(beam.DoFn): >>>>>> >> def process(self, element, >>>>>> >> >>>>>> restriction_tracker=MyOwnLittleRestrictionProvider()): >>>>>> >> # My DoFn logic... >>>>>> >> >>>>>> >> This is all fine, but something that I found a little odd is that >>>>>> the restriction provider gets replaced at runtime with a restriction >>>>>> tracker: >>>>>> >> >>>>>> >> class MyOwnLittleSDF(beam.DoFn): >>>>>> >> def process(self, element, >>>>>> >> >>>>>> restriction_tracker=MyOwnLittleRestrictionProvider()): >>>>>> >> # This assert succeeds : ) >>>>>> >> assert not isinstance(restriction_tracker, >>>>>> >> MyOwnLittleRestrictionProvider) >>>>>> >> >>>>>> >> After thinking a little bit about it, I realized that the default >>>>>> argument simply allows us to inform the runner where to find the >>>>>> restriction provider; but that the thing that we need at runtime is NOT >>>>>> the >>>>>> restriction provider - but rather, the restriction tracker. >>>>>> >> >>>>>> >> A similar pattern occurs with state and timers, where the runner >>>>>> needs to know the sort of state, the coder for the values in that state >>>>>> (or >>>>>> the time domain for timers); but the runtime parameter is different[2]. >>>>>> For >>>>>> state and timers (and window, timestamp, pane, etc.) we provide a pattern >>>>>> where users give a default value that is clearly a placeholder: >>>>>> beam.DoFn.TimerParam, or beam.DoFn.StateParam. >>>>>> > >>>>>> > >>>>>> > This is the way (new) DoFn work for Python SDK. SDK (harness) >>>>>> identifies meanings of different (potential) arguments to a DoFn using >>>>>> pre-defined default values. >>>>>> > >>>>>> >> >>>>>> >> >>>>>> >> In this case, the API is fairly similar, but (at least in my >>>>>> imagination), it is much more clear about how the DoFnParam will be >>>>>> replaced with something else at runtime. A similar change could be done >>>>>> for >>>>>> SDF: >>>>>> >> >>>>>> >> class MyOwnLittleSDF(beam.DoFn): >>>>>> >> MY_RESTRICTION = \ >>>>>> >> RestrictionSpec(provider=MyOwnLittleRestrictionProvider()) >>>>>> >> >>>>>> >> def process( >>>>>> >> self, element, >>>>>> >> >>>>>> restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)): >>>>>> >> # My DoFn logic.. >>>>>> > >>>>>> > >>>>>> > >>>>>> > If I understood correctly, what you propose is similar to the >>>>>> existing solution but we add a XXXParam parameter for consistency ? >>>>>> > I think this is fine and should be a relatively small change. Main >>>>>> point is, SDK should be able to find out the RestrictionProvider class to >>>>>> utilize it's methods before passing elements to DoFn.process() method: >>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241 >>>>>> > >>>>>> > >>>>>> >> >>>>>> >> >>>>>> >> Perhaps it is a good opportunity to consider this, since SDF is >>>>>> still in progress. >>>>>> >> >>>>>> >> Some pros: >>>>>> >> - Consistent with other parameters that we pass to DoFn methods >>>>>> >> - A bit more clear about what will happen at runtime >>>>>> >> >>>>>> >> Some cons: >>>>>> >> - SDF developers are "power users", and will have gone through the >>>>>> SDF documentation. This point will be clear to them. >>>>>> >> - This may create unnecessary work, and perhaps unintended >>>>>> consequences. >>>>>> >> - I bet there's more >>>>>> >> >>>>>> >> Thoughts? >>>>>> >> >>>>>> >> -P. >>>>>> >> >>>>>> >> [1] https://github.com/apache/beam/pull/8338 >>>>>> >> [2] >>>>>> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586 >>>>>> . >>>>>> >> >>>>>> >> >>>>>> >> >>>>>> >>>>>