Sorry, just saw https://github.com/apache/beam/pull/2211
On Mon, Jul 10, 2017 at 5:37 PM, Robert Bradshaw <rober...@google.com> wrote: > Any progress on this? > > On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchot <echauc...@gmail.com> wrote: >> Hi all, >> >> We had a discussion with Kenn yesterday about point 1 bellow, I would like >> to note it here on the ML: >> >> Using new method timer.set() instead of timer.setForNowPlus() makes the >> timer fire at the right time. >> >> Another thing, regarding point 2: if I inject the window in the @Ontimer >> method and print it, I see that at the moment the timer fires (at last >> timestamp of the window), the window is the GlobalWindow. I guess that is >> because the fixed window has just ended. Maybe the empty bagState that I get >> here is due to the end of window (passing to the GlobalWindow). I mean, as >> the states are scoped per window, and the window is different, then another >> bagState instance gets injected. Hence the empty bagState >> >> WDYT? >> >> I will open a PR even if this work is not finished yet, that way, we will >> have a convenient environment for discussing this code. >> >> Etienne >> >> >> Le 03/03/2017 à 11:48, Etienne Chauchot a écrit : >>> >>> Hi all, >>> >>> @Kenn: I have enhanced my streaming test in >>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular >>> to check that BatchingParDo doesn't mess up windows. It seems that it >>> actually does :) >>> >>> The input collection contains 10 elements timestamped at 1s interval and >>> it is divided into fixed windows of 5s duration (so 2 windows). startTime is >>> epoch. The timer is used to detect the end of the window and output the >>> content of the batch (buffer) then. >>> >>> I added some logs and I noticed two strange things (that might be linked): >>> >>> >>> 1-The timer is set twice, and it is set correctly >>> >>> INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp >>> 1970-01-01T00:00:00.000Z set for window >>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >>> >>> INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp >>> 1970-01-01T00:00:05.000Z set for window >>> [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z) >>> >>> It correctly fires twice but not at the right timeStamp: >>> >>> INFOS: ***** END OF WINDOW ***** for timer timestamp >>> 1970-01-01T00:00:04.999Z >>> >>> =>Correct >>> >>> INFOS: ***** END OF WINDOW ***** for timer timestamp >>> 1970-01-01T00:00:04.999Z >>> >>> => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z) >>> >>> Do I need to call timer.cancel() after the timer has fired ? But >>> timer.cancel() is not supported by DirectRunner yet. >>> >>> >>> >>> 2- in @OnTimer method the injected batch bagState parameter is empty >>> whereas it was added some elements since last batch.clear() while processing >>> the same window >>> >>> INFOS: ***** BATCH ***** clear >>> >>> INFOS: ***** BATCH ***** Add element for window >>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >>> >>> INFOS: ***** BATCH ***** Add element for window >>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >>> .. >>> INFOS: ***** END OF WINDOW ***** for timer timestamp >>> 1970-01-01T00:00:04.999Z >>> INFOS: ***** IN ONTIMER ***** batch size 0 >>> >>> Am I doing something wrong with timers or is there something not totally >>> finished with them (as you noticed they are quite new)? >>> >>> WDYT? >>> >>> >>> Thanks >>> >>> Etienne >>> >>> >>> Le 09/02/2017 à 09:55, Etienne Chauchot a écrit : >>>> >>>> Hi, >>>> >>>> @JB: good to know for the roadmap! thanks >>>> >>>> @Kenn: just to be clear: the timer fires fine. What I noticed is that it >>>> seems to be SET more than once because timer.setForNowPlus in called the >>>> @ProcessElement method. I am not 100% sure of it, what I noticed is that it >>>> started to work fine when I ensured to call timer.setForNowPlus only once. >>>> I >>>> don't say it's a bug, this is just not what I understood when I read the >>>> javadoc, I understood that it would be set only once per window, see >>>> javadoc bellow: >>>> >>>> An implementation of Timer is implicitly scoped - it may be scoped to a >>>> key and window, or a key, window, and trigger, etc. >>>> A timer exists in one of two states: set or unset. A timer can be set >>>> only for a single time per scope. >>>> >>>> I use the DirectRunner. >>>> >>>> For the key part: ok, makes sense. >>>> >>>> Thanks for your comments >>>> >>>> I'm leaving on vacation tonight for a little more than two weeks, I'll >>>> resume work then, maybe start a PR when it's ready. >>>> >>>> Etienne >>>> >>>> >>>> >>>> Le 08/02/2017 à 19:48, Kenneth Knowles a écrit : >>>>> >>>>> Hi Etienne, >>>>> >>>>> If the timer is firing n times for n elements, that's a bug in the >>>>> runner / >>>>> shared runner code. It should be deduped. Which runner? Can you file a >>>>> JIRA >>>>> against me to investigate? I'm still in the process of fleshing out more >>>>> and more RunnableOnService (aka ValidatesRunner) tests so I will surely >>>>> add >>>>> one (existing tests already OOMed without deduping, so it wasn't at the >>>>> top >>>>> of my priority list) >>>>> >>>>> If the end user doesn't have a natural key, I would just add one and >>>>> remove >>>>> it within your transform. Not sure how easy this will be - you might >>>>> need >>>>> user intervention. Of course, you still do need to shard or you'll be >>>>> processing the whole PCollection serially. >>>>> >>>>> Kenn >>>>> >>>>> On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré <j...@nanthrax.net> >>>>> wrote: >>>>> >>>>>> Hi >>>>>> >>>>>> AFAIR the timer per function is in the "roadmap" (remembering >>>>>> discussion >>>>>> we had with Kenn). >>>>>> >>>>>> I will take a deeper look next week on your branch. >>>>>> >>>>>> Regards >>>>>> JB >>>>>> >>>>>> On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <echauc...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>> Hi Kenn, >>>>>>> >>>>>>> I have started using state and timer APIs, they seem awesome! >>>>>>> >>>>>>> Please take a look at >>>>>>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO >>>>>>> >>>>>>> It contains a PTransform that does the batching trans-bundles and >>>>>>> respecting the windows (even if tests are not finished yet, see >>>>>>> @Ignore >>>>>>> >>>>>>> and TODOs) >>>>>>> >>>>>>> I have some questions: >>>>>>> >>>>>>> - I use the timer to detect the end of the window like you suggested. >>>>>>> But the timer can only be set in @ProcessElement and @Ontimer. Javadoc >>>>>>> says that timers are implicitly scoped to a key/window and that a >>>>>>> timer >>>>>>> >>>>>>> can be set only for a single time per scope. I noticed that if I call >>>>>>> timer.setForNowPlus in the @ProcessElement method, it seems that the >>>>>>> timer is set n times for n elements. So I just created a state with >>>>>>> boolean to prevent setting the timer more than once per key/window. >>>>>>> => Would it be good maybe to have a end user way of indicating that >>>>>>> the >>>>>>> >>>>>>> timer will be set only once per key/window. Something analogous to >>>>>>> @Setup, to avoid the user having to use a state boolean? >>>>>>> >>>>>>> - I understand that state and timers need to be per-key, but if the >>>>>>> end >>>>>>> >>>>>>> user does not need a key (lets say he just needs a >>>>>>> PCollection<String>). >>>>>>> Then, do we tell him to use a PCollection<KV> anyway like I wrote in >>>>>>> the >>>>>>> javadoc of BatchingParDo? >>>>>>> >>>>>>> WDYT? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Etienne >>>>>>> >>>>>>> >>>>>>> Le 26/01/2017 à 17:28, Etienne Chauchot a écrit : >>>>>>>> >>>>>>>> Wonderful ! >>>>>>>> >>>>>>>> Thanks Kenn ! >>>>>>>> >>>>>>>> Etienne >>>>>>>> >>>>>>>> >>>>>>>> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : >>>>>>>>> >>>>>>>>> Hi Etienne, >>>>>>>>> >>>>>>>>> I was drafting a proposal about @OnWindowExpiration when this email >>>>>>>>> arrived. I thought I would try to quickly unblock you by responding >>>>>>>>> with a >>>>>>>>> TL;DR: you can achieve your goals with state & timers as they >>>>>>> >>>>>>> currently >>>>>>>>> >>>>>>>>> exist. You'll set a timer for >>>>>>>>> window.maxTimestamp().plus(allowedLateness) >>>>>>>>> precisely - when this timer fires, you are guaranteed that the input >>>>>>>>> watermark has exceeded this point (so all new data is droppable) >>>>>>>>> while the >>>>>>>>> output timestamp is held to this point (so you can safely output >>>>>>> >>>>>>> into >>>>>>>>> >>>>>>>>> the >>>>>>>>> window). >>>>>>>>> >>>>>>>>> @OnWindowExpiration is (1) a convenience to save you from needing a >>>>>>>>> handle >>>>>>>>> on the allowed lateness (not a problem in your case) and (2) >>>>>>> >>>>>>> actually >>>>>>>>> >>>>>>>>> meaningful and potentially less expensive to implement in the >>>>>>> >>>>>>> absence of >>>>>>>>> >>>>>>>>> state (this is why it needs a design discussion at all, really). >>>>>>>>> >>>>>>>>> Caveat: these APIs are new and not supported in every runner and >>>>>>>>> windowing >>>>>>>>> configuration. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot >>>>>>> >>>>>>> <echauc...@gmail.com> >>>>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I have started to implement this ticket. For now it is implemented >>>>>>> >>>>>>> as a >>>>>>>>>> >>>>>>>>>> PTransform that simply does ParDo.of(new DoFn) and all the >>>>>>> >>>>>>> processing >>>>>>>>>> >>>>>>>>>> related to batching is done in the DoFn. >>>>>>>>>> >>>>>>>>>> I'm starting to deal with windows and bundles (starting to take a >>>>>>>>>> look at >>>>>>>>>> the State API to process trans-bundles, more questions about this >>>>>>> >>>>>>> to >>>>>>>>>> >>>>>>>>>> come). >>>>>>>>>> My comments/questions are inline: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit : >>>>>>>>>> >>>>>>>>>>> We should start by understanding the goals. If elements are in >>>>>>>>>>> different >>>>>>>>>>> windows can they be out in the same batch? If they have different >>>>>>>>>>> timestamps what timestamp should the batch have? >>>>>>>>>>> >>>>>>>>>> Regarding timestamps: currently design is as so: the transform does >>>>>>> >>>>>>> not >>>>>>>>>> >>>>>>>>>> group elements in the PCollection, so the "batch" does not exist as >>>>>>> >>>>>>> an >>>>>>>>>> >>>>>>>>>> element in the PCollection. There is only a user defined function >>>>>>>>>> (perBatchFn) that gets called when batchSize elements have been >>>>>>>>>> processed. >>>>>>>>>> This function takes an ArrayList as parameter. So elements keep >>>>>>> >>>>>>> their >>>>>>>>>> >>>>>>>>>> original timestamps >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Regarding windowing: I guess that if elements are not in the same >>>>>>>>>> window, >>>>>>>>>> they are not expected to be in the same batch. >>>>>>>>>> I'm just starting to work on these subjects, so I might lack a bit >>>>>>> >>>>>>> of >>>>>>>>>> >>>>>>>>>> information; >>>>>>>>>> what I am currently thinking about is that I need a way to know in >>>>>>> >>>>>>> the >>>>>>>>>> >>>>>>>>>> DoFn that the window has expired so that I can call the perBatchFn >>>>>>>>>> even if >>>>>>>>>> batchSize is not reached. This is the @OnWindowExpiration callback >>>>>>>>>> that >>>>>>>>>> Kenneth mentioned in an email about bundles. >>>>>>>>>> Lets imagine that we have a collection of elements artificially >>>>>>>>>> timestamped every 10 seconds (for simplicity of the example) and a >>>>>>>>>> fixed >>>>>>>>>> windowing of 1 minute. Then each window contains 6 elements. If we >>>>>>>>>> were to >>>>>>>>>> buffer the elements by batches of 5 elements, then for each window >>>>>>> >>>>>>> we >>>>>>>>>> >>>>>>>>>> expect to get 2 batches (one of 5 elements, one of 1 element). For >>>>>>>>>> that to >>>>>>>>>> append, we need a @OnWindowExpiration on the DoFn where we call >>>>>>>>>> perBatchFn >>>>>>>>>> >>>>>>>>>> As a composite transform this will likely require a group by key >>>>>>>>>> which may >>>>>>>>>>> >>>>>>>>>>> affect performance. Maybe within a dofn is better. >>>>>>>>>>> >>>>>>>>>> Yes, the processing is done with a DoFn indeed. >>>>>>>>>> >>>>>>>>>>> Then it could be some annotation or API that informs the runner. >>>>>>>>>>> Should >>>>>>>>>>> batch sizes be fixed in the annotation (element count or size) or >>>>>>>>>>> should >>>>>>>>>>> the user have some method that lets them decide when to process a >>>>>>>>>>> batch >>>>>>>>>>> based on the contents? >>>>>>>>>>> >>>>>>>>>> For now, the user passes batchSize as an argument to >>>>>>>>>> BatchParDo.via() it >>>>>>>>>> is a number of elements. But batch based on content might be useful >>>>>>>>>> for the >>>>>>>>>> user. Give hint to the runner might be more flexible for the >>>>>>> >>>>>>> runner. >>>>>>>>>> >>>>>>>>>> Thanks. >>>>>>>>>> >>>>>>>>>>> Another thing to think about is whether this should be connected >>>>>>> >>>>>>> to >>>>>>>>>>> >>>>>>>>>>> the >>>>>>>>>>> ability to run parts of the bundle in parallel. >>>>>>>>>>> >>>>>>>>>> Yes! >>>>>>>>>> >>>>>>>>>>> Maybe each batch is an RPC >>>>>>>>>>> and you just want to start an async RPC for each batch. Then in >>>>>>>>>>> addition >>>>>>>>>>> to >>>>>>>>>>> start the final RPC in finishBundle, you also need to wait for all >>>>>>> >>>>>>> the >>>>>>>>>>> >>>>>>>>>>> RPCs >>>>>>>>>>> to complete. >>>>>>>>>>> >>>>>>>>>> Actually, currently each batch processing is whatever the user >>>>>>> >>>>>>> wants >>>>>>>>>> >>>>>>>>>> (perBatchFn user defined function). If the user decides to issue an >>>>>>>>>> async >>>>>>>>>> RPC in that function (call with the arrayList of input elements), >>>>>>>>>> IMHO he >>>>>>>>>> is responsible for waiting for the response in that method if he >>>>>>>>>> needs the >>>>>>>>>> response, but he can also do a send and forget, depending on his >>>>>>> >>>>>>> use >>>>>>>>>> >>>>>>>>>> case. >>>>>>>>>> >>>>>>>>>> Besides, I have also included a perElementFn user function to allow >>>>>>> >>>>>>> the >>>>>>>>>> >>>>>>>>>> user to do some processing on the elements before adding them to >>>>>>> >>>>>>> the >>>>>>>>>> >>>>>>>>>> batch >>>>>>>>>> (example use case: convert a String to a DTO object to invoke an >>>>>>>>>> external >>>>>>>>>> service) >>>>>>>>>> >>>>>>>>>> Etienne >>>>>>>>>> >>>>>>>>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com> >>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi JB, >>>>>>>>>>> >>>>>>>>>>> I meant jira vote but discussion on the ML works also :) >>>>>>>>>>> >>>>>>>>>>> As I understand the need (see stackoverflow links in jira ticket) >>>>>>> >>>>>>> the >>>>>>>>>>> >>>>>>>>>>> aim is to avoid the user having to code the batching logic in his >>>>>>> >>>>>>> own >>>>>>>>>>> >>>>>>>>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the >>>>>>>>>>> bundles. >>>>>>>>>>> For example, possible use case is to batch a call to an external >>>>>>>>>>> service >>>>>>>>>>> (for performance). >>>>>>>>>>> >>>>>>>>>>> I was thinking about providing a PTransform that implements the >>>>>>>>>>> batching >>>>>>>>>>> in its own DoFn and that takes user defined functions for >>>>>>>>>>> customization. >>>>>>>>>>> >>>>>>>>>>> Etienne >>>>>>>>>>> >>>>>>>>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit : >>>>>>>>>>> >>>>>>>>>>>> Hi >>>>>>>>>>>> >>>>>>>>>>>> I guess you mean discussion on the mailing list about that, right >>>>>>> >>>>>>> ? >>>>>>>>>>>> >>>>>>>>>>>> AFAIR the idea is to provide a utility class to deal with >>>>>>>>>>>> >>>>>>>>>>> pooling/batching. However not sure it's required as with >>>>>>>>>>> @StartBundle etc >>>>>>>>>>> in DoFn and batching depends of the end user "logic". >>>>>>>>>>> >>>>>>>>>>>> Regards >>>>>>>>>>>> JB >>>>>>>>>>>> >>>>>>>>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne >>>>>>>>>>>> Chauchot<echauc...@gmail.com> >>>>>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi all, >>>>>>>>>>>>> >>>>>>>>>>>>> I have started to work on this ticket >>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-135 >>>>>>>>>>>>> >>>>>>>>>>>>> As there where no vote since March 18th, is the issue still >>>>>>>>>>>>> relevant/needed? >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> >>>>>>>>>>>>> Etienne >>>>>>>>>>>>> >>>> >>> >>