Hi AFAIR the timer per function is in the "roadmap" (remembering discussion we had with Kenn).
I will take a deeper look next week on your branch. Regards JB On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <echauc...@gmail.com> wrote: >Hi Kenn, > >I have started using state and timer APIs, they seem awesome! > >Please take a look at >https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO > >It contains a PTransform that does the batching trans-bundles and >respecting the windows (even if tests are not finished yet, see @Ignore > >and TODOs) > > I have some questions: > >- I use the timer to detect the end of the window like you suggested. >But the timer can only be set in @ProcessElement and @Ontimer. Javadoc >says that timers are implicitly scoped to a key/window and that a timer > >can be set only for a single time per scope. I noticed that if I call >timer.setForNowPlus in the @ProcessElement method, it seems that the >timer is set n times for n elements. So I just created a state with >boolean to prevent setting the timer more than once per key/window. >=> Would it be good maybe to have a end user way of indicating that the > >timer will be set only once per key/window. Something analogous to >@Setup, to avoid the user having to use a state boolean? > >- I understand that state and timers need to be per-key, but if the end > >user does not need a key (lets say he just needs a >PCollection<String>). >Then, do we tell him to use a PCollection<KV> anyway like I wrote in >the >javadoc of BatchingParDo? > >WDYT? > >Thanks, > >Etienne > > >Le 26/01/2017 à 17:28, Etienne Chauchot a écrit : >> Wonderful ! >> >> Thanks Kenn ! >> >> Etienne >> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : >>> Hi Etienne, >>> >>> I was drafting a proposal about @OnWindowExpiration when this email >>> arrived. I thought I would try to quickly unblock you by responding >>> with a >>> TL;DR: you can achieve your goals with state & timers as they >currently >>> exist. You'll set a timer for >>> window.maxTimestamp().plus(allowedLateness) >>> precisely - when this timer fires, you are guaranteed that the input >>> watermark has exceeded this point (so all new data is droppable) >>> while the >>> output timestamp is held to this point (so you can safely output >into >>> the >>> window). >>> >>> @OnWindowExpiration is (1) a convenience to save you from needing a >>> handle >>> on the allowed lateness (not a problem in your case) and (2) >actually >>> meaningful and potentially less expensive to implement in the >absence of >>> state (this is why it needs a design discussion at all, really). >>> >>> Caveat: these APIs are new and not supported in every runner and >>> windowing >>> configuration. >>> >>> Kenn >>> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot ><echauc...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I have started to implement this ticket. For now it is implemented >as a >>>> PTransform that simply does ParDo.of(new DoFn) and all the >processing >>>> related to batching is done in the DoFn. >>>> >>>> I'm starting to deal with windows and bundles (starting to take a >>>> look at >>>> the State API to process trans-bundles, more questions about this >to >>>> come). >>>> My comments/questions are inline: >>>> >>>> >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit : >>>> >>>>> We should start by understanding the goals. If elements are in >>>>> different >>>>> windows can they be out in the same batch? If they have different >>>>> timestamps what timestamp should the batch have? >>>>> >>>> Regarding timestamps: currently design is as so: the transform does >not >>>> group elements in the PCollection, so the "batch" does not exist as >an >>>> element in the PCollection. There is only a user defined function >>>> (perBatchFn) that gets called when batchSize elements have been >>>> processed. >>>> This function takes an ArrayList as parameter. So elements keep >their >>>> original timestamps >>>> >>>> >>>> Regarding windowing: I guess that if elements are not in the same >>>> window, >>>> they are not expected to be in the same batch. >>>> I'm just starting to work on these subjects, so I might lack a bit >of >>>> information; >>>> what I am currently thinking about is that I need a way to know in >the >>>> DoFn that the window has expired so that I can call the perBatchFn >>>> even if >>>> batchSize is not reached. This is the @OnWindowExpiration callback > >>>> that >>>> Kenneth mentioned in an email about bundles. >>>> Lets imagine that we have a collection of elements artificially >>>> timestamped every 10 seconds (for simplicity of the example) and a >>>> fixed >>>> windowing of 1 minute. Then each window contains 6 elements. If we >>>> were to >>>> buffer the elements by batches of 5 elements, then for each window >we >>>> expect to get 2 batches (one of 5 elements, one of 1 element). For >>>> that to >>>> append, we need a @OnWindowExpiration on the DoFn where we call >>>> perBatchFn >>>> >>>> As a composite transform this will likely require a group by key >>>> which may >>>>> affect performance. Maybe within a dofn is better. >>>>> >>>> Yes, the processing is done with a DoFn indeed. >>>> >>>>> Then it could be some annotation or API that informs the runner. >>>>> Should >>>>> batch sizes be fixed in the annotation (element count or size) or >>>>> should >>>>> the user have some method that lets them decide when to process a >>>>> batch >>>>> based on the contents? >>>>> >>>> For now, the user passes batchSize as an argument to >>>> BatchParDo.via() it >>>> is a number of elements. But batch based on content might be useful > >>>> for the >>>> user. Give hint to the runner might be more flexible for the >runner. >>>> Thanks. >>>> >>>>> Another thing to think about is whether this should be connected >to >>>>> the >>>>> ability to run parts of the bundle in parallel. >>>>> >>>> Yes! >>>> >>>>> Maybe each batch is an RPC >>>>> and you just want to start an async RPC for each batch. Then in >>>>> addition >>>>> to >>>>> start the final RPC in finishBundle, you also need to wait for all >the >>>>> RPCs >>>>> to complete. >>>>> >>>> Actually, currently each batch processing is whatever the user >wants >>>> (perBatchFn user defined function). If the user decides to issue an > >>>> async >>>> RPC in that function (call with the arrayList of input elements), >>>> IMHO he >>>> is responsible for waiting for the response in that method if he >>>> needs the >>>> response, but he can also do a send and forget, depending on his >use >>>> case. >>>> >>>> Besides, I have also included a perElementFn user function to allow >the >>>> user to do some processing on the elements before adding them to >the >>>> batch >>>> (example use case: convert a String to a DTO object to invoke an >>>> external >>>> service) >>>> >>>> Etienne >>>> >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com> >>>>> wrote: >>>>> >>>>> Hi JB, >>>>> >>>>> I meant jira vote but discussion on the ML works also :) >>>>> >>>>> As I understand the need (see stackoverflow links in jira ticket) >the >>>>> aim is to avoid the user having to code the batching logic in his >own >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the >>>>> bundles. >>>>> For example, possible use case is to batch a call to an external >>>>> service >>>>> (for performance). >>>>> >>>>> I was thinking about providing a PTransform that implements the >>>>> batching >>>>> in its own DoFn and that takes user defined functions for >>>>> customization. >>>>> >>>>> Etienne >>>>> >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit : >>>>> >>>>>> Hi >>>>>> >>>>>> I guess you mean discussion on the mailing list about that, right >? >>>>>> >>>>>> AFAIR the idea is to provide a utility class to deal with >>>>>> >>>>> pooling/batching. However not sure it's required as with >>>>> @StartBundle etc >>>>> in DoFn and batching depends of the end user "logic". >>>>> >>>>>> Regards >>>>>> JB >>>>>> >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne >>>>>> Chauchot<echauc...@gmail.com> >>>>>> >>>>> wrote: >>>>> >>>>>> Hi all, >>>>>>> I have started to work on this ticket >>>>>>> https://issues.apache.org/jira/browse/BEAM-135 >>>>>>> >>>>>>> As there where no vote since March 18th, is the issue still >>>>>>> relevant/needed? >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Etienne >>>>>>> >>