Hi

AFAIR the timer per function is in the "roadmap" (remembering discussion we had 
with Kenn).

I will take a deeper look next week on your branch.

Regards
JB

On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <echauc...@gmail.com> wrote:
>Hi Kenn,
>
>I have started using state and timer APIs, they seem awesome!
>
>Please take a look at
>https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO
>
>It contains a PTransform that does the batching trans-bundles and
>respecting the windows (even if tests are not finished yet, see @Ignore
>
>and TODOs)
>
>  I have some questions:
>
>- I use the timer to detect the end of the window like you suggested. 
>But the timer can only be set in @ProcessElement and @Ontimer. Javadoc 
>says that timers are implicitly scoped to a key/window and that a timer
>
>can be set only for a single time per scope. I noticed that if I call
>timer.setForNowPlus in the @ProcessElement method, it seems that the
>timer is set n times for n elements. So I just created a state with
>boolean to prevent setting the timer more than once per key/window.
>=> Would it be good maybe to have a end user way of indicating that the
>
>timer will be set only once per key/window. Something analogous to
>@Setup, to avoid the user having to use a state boolean?
>
>- I understand that state and timers need to be per-key, but if the end
>
>user does not need a key (lets say he just needs a
>PCollection<String>).
>Then, do we tell him to use a PCollection<KV> anyway like I wrote in
>the
>javadoc of BatchingParDo?
>
>WDYT?
>
>Thanks,
>
>Etienne
>
>
>Le 26/01/2017 à 17:28, Etienne Chauchot a écrit :
>> Wonderful !
>>
>> Thanks Kenn !
>>
>> Etienne
>>
>>
>> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>>> Hi Etienne,
>>>
>>> I was drafting a proposal about @OnWindowExpiration when this email
>>> arrived. I thought I would try to quickly unblock you by responding
>>> with a
>>> TL;DR: you can achieve your goals with state & timers as they
>currently
>>> exist. You'll set a timer for
>>> window.maxTimestamp().plus(allowedLateness)
>>> precisely - when this timer fires, you are guaranteed that the input
>>> watermark has exceeded this point (so all new data is droppable)
>>> while the
>>> output timestamp is held to this point (so you can safely output
>into
>>> the
>>> window).
>>>
>>> @OnWindowExpiration is (1) a convenience to save you from needing a
>>> handle
>>> on the allowed lateness (not a problem in your case) and (2)
>actually
>>> meaningful and potentially less expensive to implement in the
>absence of
>>> state (this is why it needs a design discussion at all, really).
>>>
>>> Caveat: these APIs are new and not supported in every runner and
>>> windowing
>>> configuration.
>>>
>>> Kenn
>>>
>>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
><echauc...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have started to implement this ticket. For now it is implemented
>as a
>>>> PTransform that simply does ParDo.of(new DoFn) and all the
>processing
>>>> related to batching is done in the DoFn.
>>>>
>>>> I'm starting to deal with windows and bundles (starting to take a
>>>> look at
>>>> the State API to process trans-bundles, more questions about this
>to
>>>> come).
>>>> My comments/questions are inline:
>>>>
>>>>
>>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>>>>
>>>>> We should start by understanding the goals. If elements are in
>>>>> different
>>>>> windows can they be out in the same batch? If they have different
>>>>> timestamps what timestamp should the batch have?
>>>>>
>>>> Regarding timestamps: currently design is as so: the transform does
>not
>>>> group elements in the PCollection, so the "batch" does not exist as
>an
>>>> element in the PCollection. There is only a user defined function
>>>> (perBatchFn) that gets called when batchSize elements have been
>>>> processed.
>>>> This function takes an ArrayList as parameter. So elements keep
>their
>>>> original timestamps
>>>>
>>>>
>>>> Regarding windowing: I guess that if elements are not in the same
>>>> window,
>>>> they are not expected to be in the same batch.
>>>> I'm just starting to work on these subjects, so I might lack a bit
>of
>>>> information;
>>>> what I am currently thinking about is that I need a way to know in
>the
>>>> DoFn that the window has expired so that I can call the perBatchFn
>>>> even if
>>>> batchSize is not reached.  This is the @OnWindowExpiration callback
>
>>>> that
>>>> Kenneth mentioned in an email about bundles.
>>>> Lets imagine that we have a collection of elements artificially
>>>> timestamped every 10 seconds (for simplicity of the example) and a
>>>> fixed
>>>> windowing of 1 minute. Then each window contains 6 elements. If we
>>>> were to
>>>> buffer the elements by batches of 5 elements, then for each window
>we
>>>> expect to get 2 batches (one of 5 elements, one of 1 element). For
>>>> that to
>>>> append, we need a @OnWindowExpiration on the DoFn where we call
>>>> perBatchFn
>>>>
>>>> As a composite transform this will likely require a group by key
>>>> which may
>>>>> affect performance. Maybe within a dofn is better.
>>>>>
>>>> Yes, the processing is done with a DoFn indeed.
>>>>
>>>>> Then it could be some annotation or API that informs the runner.
>>>>> Should
>>>>> batch sizes be fixed in the annotation (element count or size) or
>>>>> should
>>>>> the user have some method that lets them decide when to process a
>>>>> batch
>>>>> based on the contents?
>>>>>
>>>> For now, the user passes batchSize as an argument to
>>>> BatchParDo.via() it
>>>> is a number of elements. But batch based on content might be useful
>
>>>> for the
>>>> user. Give hint to the runner might be more flexible for the
>runner.
>>>> Thanks.
>>>>
>>>>> Another thing to think about is whether this should be connected
>to
>>>>> the
>>>>> ability to run parts of the bundle in parallel.
>>>>>
>>>> Yes!
>>>>
>>>>> Maybe each batch is an RPC
>>>>> and you just want to start an async RPC for each batch. Then in
>>>>> addition
>>>>> to
>>>>> start the final RPC in finishBundle, you also need to wait for all
>the
>>>>> RPCs
>>>>> to complete.
>>>>>
>>>> Actually, currently each batch processing is whatever the user
>wants
>>>> (perBatchFn user defined function). If the user decides to issue an
>
>>>> async
>>>> RPC in that function (call with the arrayList of input elements),
>>>> IMHO he
>>>> is responsible for waiting for the response in that method if he
>>>> needs the
>>>> response, but he can also do a send and forget, depending on his
>use
>>>> case.
>>>>
>>>> Besides, I have also included a perElementFn user function to allow
>the
>>>> user to do some processing on the elements before adding them to
>the
>>>> batch
>>>> (example use case: convert a String to a DTO object to invoke an
>>>> external
>>>> service)
>>>>
>>>> Etienne
>>>>
>>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com>
>>>>> wrote:
>>>>>
>>>>> Hi JB,
>>>>>
>>>>> I meant jira vote but discussion on the ML works also :)
>>>>>
>>>>> As I understand the need (see stackoverflow links in jira ticket)
>the
>>>>> aim is to avoid the user having to code the batching logic in his
>own
>>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the
>>>>> bundles.
>>>>> For example, possible use case is to batch a call to an external
>>>>> service
>>>>> (for performance).
>>>>>
>>>>> I was thinking about providing a PTransform that implements the
>>>>> batching
>>>>> in its own DoFn and that takes user defined functions for
>>>>> customization.
>>>>>
>>>>> Etienne
>>>>>
>>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> I guess you mean discussion on the mailing list about that, right
>?
>>>>>>
>>>>>> AFAIR the ide⁣​a is to provide a utility class to deal with
>>>>>>
>>>>> pooling/batching. However not sure it's required as with
>>>>> @StartBundle etc
>>>>> in DoFn and batching depends of the end user "logic".
>>>>>
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne
>>>>>> Chauchot<echauc...@gmail.com>
>>>>>>
>>>>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>> I have started to work on this ticket
>>>>>>> https://issues.apache.org/jira/browse/BEAM-135
>>>>>>>
>>>>>>> As there where no vote since March 18th, is the issue still
>>>>>>> relevant/needed?
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Etienne
>>>>>>>
>>

Reply via email to