+1 for the change for Java too both for consistency with Python and
with the way State/Timers work too.

It would be really nice if possible to see a concrete proposed example
(or even better design doc).

Thanks for bringing this idea Pablo and sorry for delayed answer.

On Wed, Jun 5, 2019 at 8:44 PM Pablo Estrada <[email protected]> wrote:
>
> I have no objections.
>
> +Ismaël Mejía who has familiarity and interest in Java SDF.
>
> On Wed, Jun 5, 2019 at 11:31 AM Brian Hulette <[email protected]> wrote:
>>
>> Just wanted to resurrect this to say that it seems appropriate to make the 
>> same change in Java. All the same arguments apply there, and now there's the 
>> additional argument for maintaining symmetry with Python.
>>
>> I think BEAM-7250 should be changed to a ticket to actually implement this 
>> in Java unless someone has an objection.
>>
>> Brian
>>
>> On Wed, May 8, 2019 at 2:20 PM Pablo Estrada <[email protected]> wrote:
>>>
>>> Hello all,
>>> The API has been updated for Python (See 
>>> https://github.com/apache/beam/pull/8430). Please, if you catch any 
>>> documentation that needs updating, flag to me or just propose the change : )
>>>
>>> As for Java - we didn't end up determining whether it makes sense to update 
>>> the API as well. Thoughts from others?
>>>
>>> In any case, I've filed https://jira.apache.org/jira/browse/BEAM-7250 to 
>>> track this for Java.
>>>
>>> Best
>>> -P.
>>>
>>> On Mon, Apr 29, 2019 at 2:41 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>> Pablo, all the SplittableDoFn stuff is marked as @Experimental so one is 
>>>> able to change it. There really is only one complicated one to change in 
>>>> Watch.java, the rest are quite straightforward.
>>>>
>>>> On Mon, Apr 29, 2019 at 2:23 PM Pablo Estrada <[email protected]> wrote:
>>>>>
>>>>> Thanks all,
>>>>>  @Luke - I imagine that would be an improvement to the API, but this may 
>>>>> be harder as this is already available to users, and there are those who 
>>>>> have implemented SDFs under the current API. Would it be possible to make 
>>>>> a backwards-compatible change to the API here?
>>>>>
>>>>> For the Python changes, I've proposed a pull request: 
>>>>> https://github.com/apache/beam/pull/8430 - it was smaller than I thought 
>>>>> : ) - All comments welcome please.
>>>>>
>>>>> +Boyuan Zhang I am happy to wait for your SyntheticSource PR to be merged 
>>>>> and make the appropriate changes if you'd like.
>>>>> Best
>>>>> -P.
>>>>>
>>>>> On Mon, Apr 29, 2019 at 8:23 AM Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>> Would it make sense to also do this in the Java SDK?
>>>>>>
>>>>>> The would make the restriction provider also mirror the TimerSpec and 
>>>>>> StateSpec which use annotations similar to how its done in Python.
>>>>>>
>>>>>> On Mon, Apr 29, 2019 at 3:42 AM Robert Bradshaw <[email protected]> 
>>>>>> wrote:
>>>>>>>
>>>>>>> +1 to introducing this Param for consistency (and making the
>>>>>>> substitution more obvious), and I think SDF is still new/experimental
>>>>>>> enough we can do this. I don't know if we need Spec in addition to
>>>>>>> Param and Provider.
>>>>>>>
>>>>>>> On Sat, Apr 27, 2019 at 1:07 AM Chamikara Jayalath 
>>>>>>> <[email protected]> wrote:
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > On Fri, Apr 26, 2019 at 3:43 PM Pablo Estrada <[email protected]> 
>>>>>>> > wrote:
>>>>>>> >>
>>>>>>> >> Hi all,
>>>>>>> >> Sorry about the wall of text.
>>>>>>> >> So, first of all, I thought about this while reviewing a PR by 
>>>>>>> >> Boyuan with an example of an SDF[1]. This is very exciting btw : ).
>>>>>>> >>
>>>>>>> >> Anyway... I certainly have a limited view of the whole SDF effort, 
>>>>>>> >> but I think it's worth discussing this particular point about the 
>>>>>>> >> API before finalizing SDF and making it widely available. So here I 
>>>>>>> >> go:
>>>>>>> >>
>>>>>>> >> The Python API for SDF asks users to provide a restriction provider 
>>>>>>> >> in their process function signature. More or less the following:
>>>>>>> >>
>>>>>>> >> class MyOwnLittleSDF(beam.DoFn):
>>>>>>> >>   def process(self, element,
>>>>>>> >>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>>>>>>> >>     # My DoFn logic...
>>>>>>> >>
>>>>>>> >> This is all fine, but something that I found a little odd is that 
>>>>>>> >> the restriction provider gets replaced at runtime with a restriction 
>>>>>>> >> tracker:
>>>>>>> >>
>>>>>>> >> class MyOwnLittleSDF(beam.DoFn):
>>>>>>> >>   def process(self, element,
>>>>>>> >>               restriction_tracker=MyOwnLittleRestrictionProvider()):
>>>>>>> >>     # This assert succeeds : )
>>>>>>> >>     assert not isinstance(restriction_tracker,
>>>>>>> >>                           MyOwnLittleRestrictionProvider)
>>>>>>> >>
>>>>>>> >> After thinking a little bit about it, I realized that the default 
>>>>>>> >> argument simply allows us to inform the runner where to find the 
>>>>>>> >> restriction provider; but that the thing that we need at runtime is 
>>>>>>> >> NOT the restriction provider - but rather, the restriction tracker.
>>>>>>> >>
>>>>>>> >> A similar pattern occurs with state and timers, where the runner 
>>>>>>> >> needs to know the sort of state, the coder for the values in that 
>>>>>>> >> state (or the time domain for timers); but the runtime parameter is 
>>>>>>> >> different[2]. For state and timers (and window, timestamp, pane, 
>>>>>>> >> etc.) we provide a pattern where users give a default value that is 
>>>>>>> >> clearly a placeholder: beam.DoFn.TimerParam, or beam.DoFn.StateParam.
>>>>>>> >
>>>>>>> >
>>>>>>> > This is the way (new) DoFn work for Python SDK. SDK (harness) 
>>>>>>> > identifies meanings of different (potential) arguments to a DoFn 
>>>>>>> > using pre-defined default values.
>>>>>>> >
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> In this case, the API is fairly similar, but (at least in my 
>>>>>>> >> imagination), it is much more clear about how the DoFnParam will be 
>>>>>>> >> replaced with something else at runtime. A similar change could be 
>>>>>>> >> done for SDF:
>>>>>>> >>
>>>>>>> >> class MyOwnLittleSDF(beam.DoFn):
>>>>>>> >>   MY_RESTRICTION = \
>>>>>>> >>       RestrictionSpec(provider=MyOwnLittleRestrictionProvider())
>>>>>>> >>
>>>>>>> >>   def process(
>>>>>>> >>       self, element,
>>>>>>> >>       
>>>>>>> >> restriction_tracker=beam.DoFn.RestrictionParam(MY_RESTRICTION)):
>>>>>>> >>     # My DoFn logic..
>>>>>>> >
>>>>>>> >
>>>>>>> >
>>>>>>> > If I understood correctly, what you propose is similar to the 
>>>>>>> > existing solution but we add a XXXParam parameter for consistency ?
>>>>>>> > I think this is fine and should be a relatively small change. Main 
>>>>>>> > point is, SDK should be able to find out the RestrictionProvider 
>>>>>>> > class to utilize it's methods before passing elements to 
>>>>>>> > DoFn.process() method: 
>>>>>>> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/common.py#L241
>>>>>>> >
>>>>>>> >
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> Perhaps it is a good opportunity to consider this, since SDF is 
>>>>>>> >> still in progress.
>>>>>>> >>
>>>>>>> >> Some pros:
>>>>>>> >> - Consistent with other parameters that we pass to DoFn methods
>>>>>>> >> - A bit more clear about what will happen at runtime
>>>>>>> >>
>>>>>>> >> Some cons:
>>>>>>> >> - SDF developers are "power users", and will have gone through the 
>>>>>>> >> SDF documentation. This point will be clear to them.
>>>>>>> >> - This may create unnecessary work, and perhaps unintended 
>>>>>>> >> consequences.
>>>>>>> >> - I bet there's more
>>>>>>> >>
>>>>>>> >> Thoughts?
>>>>>>> >>
>>>>>>> >> -P.
>>>>>>> >>
>>>>>>> >> [1] https://github.com/apache/beam/pull/8338
>>>>>>> >> [2] 
>>>>>>> >> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/userstate_test.py#L560-L586
>>>>>>> >>  .
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>

Reply via email to