Any progress on this?
On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchot <[email protected]> wrote: > Hi all, > > We had a discussion with Kenn yesterday about point 1 bellow, I would like > to note it here on the ML: > > Using new method timer.set() instead of timer.setForNowPlus() makes the > timer fire at the right time. > > Another thing, regarding point 2: if I inject the window in the @Ontimer > method and print it, I see that at the moment the timer fires (at last > timestamp of the window), the window is the GlobalWindow. I guess that is > because the fixed window has just ended. Maybe the empty bagState that I get > here is due to the end of window (passing to the GlobalWindow). I mean, as > the states are scoped per window, and the window is different, then another > bagState instance gets injected. Hence the empty bagState > > WDYT? > > I will open a PR even if this work is not finished yet, that way, we will > have a convenient environment for discussing this code. > > Etienne > > > Le 03/03/2017 à 11:48, Etienne Chauchot a écrit : >> >> Hi all, >> >> @Kenn: I have enhanced my streaming test in >> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular >> to check that BatchingParDo doesn't mess up windows. It seems that it >> actually does :) >> >> The input collection contains 10 elements timestamped at 1s interval and >> it is divided into fixed windows of 5s duration (so 2 windows). startTime is >> epoch. The timer is used to detect the end of the window and output the >> content of the batch (buffer) then. >> >> I added some logs and I noticed two strange things (that might be linked): >> >> >> 1-The timer is set twice, and it is set correctly >> >> INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp >> 1970-01-01T00:00:00.000Z set for window >> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >> >> INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp >> 1970-01-01T00:00:05.000Z set for window >> [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z) >> >> It correctly fires twice but not at the right timeStamp: >> >> INFOS: ***** END OF WINDOW ***** for timer timestamp >> 1970-01-01T00:00:04.999Z >> >> =>Correct >> >> INFOS: ***** END OF WINDOW ***** for timer timestamp >> 1970-01-01T00:00:04.999Z >> >> => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z) >> >> Do I need to call timer.cancel() after the timer has fired ? But >> timer.cancel() is not supported by DirectRunner yet. >> >> >> >> 2- in @OnTimer method the injected batch bagState parameter is empty >> whereas it was added some elements since last batch.clear() while processing >> the same window >> >> INFOS: ***** BATCH ***** clear >> >> INFOS: ***** BATCH ***** Add element for window >> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >> >> INFOS: ***** BATCH ***** Add element for window >> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z) >> .. >> INFOS: ***** END OF WINDOW ***** for timer timestamp >> 1970-01-01T00:00:04.999Z >> INFOS: ***** IN ONTIMER ***** batch size 0 >> >> Am I doing something wrong with timers or is there something not totally >> finished with them (as you noticed they are quite new)? >> >> WDYT? >> >> >> Thanks >> >> Etienne >> >> >> Le 09/02/2017 à 09:55, Etienne Chauchot a écrit : >>> >>> Hi, >>> >>> @JB: good to know for the roadmap! thanks >>> >>> @Kenn: just to be clear: the timer fires fine. What I noticed is that it >>> seems to be SET more than once because timer.setForNowPlus in called the >>> @ProcessElement method. I am not 100% sure of it, what I noticed is that it >>> started to work fine when I ensured to call timer.setForNowPlus only once. I >>> don't say it's a bug, this is just not what I understood when I read the >>> javadoc, I understood that it would be set only once per window, see >>> javadoc bellow: >>> >>> An implementation of Timer is implicitly scoped - it may be scoped to a >>> key and window, or a key, window, and trigger, etc. >>> A timer exists in one of two states: set or unset. A timer can be set >>> only for a single time per scope. >>> >>> I use the DirectRunner. >>> >>> For the key part: ok, makes sense. >>> >>> Thanks for your comments >>> >>> I'm leaving on vacation tonight for a little more than two weeks, I'll >>> resume work then, maybe start a PR when it's ready. >>> >>> Etienne >>> >>> >>> >>> Le 08/02/2017 à 19:48, Kenneth Knowles a écrit : >>>> >>>> Hi Etienne, >>>> >>>> If the timer is firing n times for n elements, that's a bug in the >>>> runner / >>>> shared runner code. It should be deduped. Which runner? Can you file a >>>> JIRA >>>> against me to investigate? I'm still in the process of fleshing out more >>>> and more RunnableOnService (aka ValidatesRunner) tests so I will surely >>>> add >>>> one (existing tests already OOMed without deduping, so it wasn't at the >>>> top >>>> of my priority list) >>>> >>>> If the end user doesn't have a natural key, I would just add one and >>>> remove >>>> it within your transform. Not sure how easy this will be - you might >>>> need >>>> user intervention. Of course, you still do need to shard or you'll be >>>> processing the whole PCollection serially. >>>> >>>> Kenn >>>> >>>> On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré <[email protected]> >>>> wrote: >>>> >>>>> Hi >>>>> >>>>> AFAIR the timer per function is in the "roadmap" (remembering >>>>> discussion >>>>> we had with Kenn). >>>>> >>>>> I will take a deeper look next week on your branch. >>>>> >>>>> Regards >>>>> JB >>>>> >>>>> On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <[email protected]> >>>>> wrote: >>>>>> >>>>>> Hi Kenn, >>>>>> >>>>>> I have started using state and timer APIs, they seem awesome! >>>>>> >>>>>> Please take a look at >>>>>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO >>>>>> >>>>>> It contains a PTransform that does the batching trans-bundles and >>>>>> respecting the windows (even if tests are not finished yet, see >>>>>> @Ignore >>>>>> >>>>>> and TODOs) >>>>>> >>>>>> I have some questions: >>>>>> >>>>>> - I use the timer to detect the end of the window like you suggested. >>>>>> But the timer can only be set in @ProcessElement and @Ontimer. Javadoc >>>>>> says that timers are implicitly scoped to a key/window and that a >>>>>> timer >>>>>> >>>>>> can be set only for a single time per scope. I noticed that if I call >>>>>> timer.setForNowPlus in the @ProcessElement method, it seems that the >>>>>> timer is set n times for n elements. So I just created a state with >>>>>> boolean to prevent setting the timer more than once per key/window. >>>>>> => Would it be good maybe to have a end user way of indicating that >>>>>> the >>>>>> >>>>>> timer will be set only once per key/window. Something analogous to >>>>>> @Setup, to avoid the user having to use a state boolean? >>>>>> >>>>>> - I understand that state and timers need to be per-key, but if the >>>>>> end >>>>>> >>>>>> user does not need a key (lets say he just needs a >>>>>> PCollection<String>). >>>>>> Then, do we tell him to use a PCollection<KV> anyway like I wrote in >>>>>> the >>>>>> javadoc of BatchingParDo? >>>>>> >>>>>> WDYT? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Etienne >>>>>> >>>>>> >>>>>> Le 26/01/2017 à 17:28, Etienne Chauchot a écrit : >>>>>>> >>>>>>> Wonderful ! >>>>>>> >>>>>>> Thanks Kenn ! >>>>>>> >>>>>>> Etienne >>>>>>> >>>>>>> >>>>>>> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : >>>>>>>> >>>>>>>> Hi Etienne, >>>>>>>> >>>>>>>> I was drafting a proposal about @OnWindowExpiration when this email >>>>>>>> arrived. I thought I would try to quickly unblock you by responding >>>>>>>> with a >>>>>>>> TL;DR: you can achieve your goals with state & timers as they >>>>>> >>>>>> currently >>>>>>>> >>>>>>>> exist. You'll set a timer for >>>>>>>> window.maxTimestamp().plus(allowedLateness) >>>>>>>> precisely - when this timer fires, you are guaranteed that the input >>>>>>>> watermark has exceeded this point (so all new data is droppable) >>>>>>>> while the >>>>>>>> output timestamp is held to this point (so you can safely output >>>>>> >>>>>> into >>>>>>>> >>>>>>>> the >>>>>>>> window). >>>>>>>> >>>>>>>> @OnWindowExpiration is (1) a convenience to save you from needing a >>>>>>>> handle >>>>>>>> on the allowed lateness (not a problem in your case) and (2) >>>>>> >>>>>> actually >>>>>>>> >>>>>>>> meaningful and potentially less expensive to implement in the >>>>>> >>>>>> absence of >>>>>>>> >>>>>>>> state (this is why it needs a design discussion at all, really). >>>>>>>> >>>>>>>> Caveat: these APIs are new and not supported in every runner and >>>>>>>> windowing >>>>>>>> configuration. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot >>>>>> >>>>>> <[email protected]> >>>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I have started to implement this ticket. For now it is implemented >>>>>> >>>>>> as a >>>>>>>>> >>>>>>>>> PTransform that simply does ParDo.of(new DoFn) and all the >>>>>> >>>>>> processing >>>>>>>>> >>>>>>>>> related to batching is done in the DoFn. >>>>>>>>> >>>>>>>>> I'm starting to deal with windows and bundles (starting to take a >>>>>>>>> look at >>>>>>>>> the State API to process trans-bundles, more questions about this >>>>>> >>>>>> to >>>>>>>>> >>>>>>>>> come). >>>>>>>>> My comments/questions are inline: >>>>>>>>> >>>>>>>>> >>>>>>>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit : >>>>>>>>> >>>>>>>>>> We should start by understanding the goals. If elements are in >>>>>>>>>> different >>>>>>>>>> windows can they be out in the same batch? If they have different >>>>>>>>>> timestamps what timestamp should the batch have? >>>>>>>>>> >>>>>>>>> Regarding timestamps: currently design is as so: the transform does >>>>>> >>>>>> not >>>>>>>>> >>>>>>>>> group elements in the PCollection, so the "batch" does not exist as >>>>>> >>>>>> an >>>>>>>>> >>>>>>>>> element in the PCollection. There is only a user defined function >>>>>>>>> (perBatchFn) that gets called when batchSize elements have been >>>>>>>>> processed. >>>>>>>>> This function takes an ArrayList as parameter. So elements keep >>>>>> >>>>>> their >>>>>>>>> >>>>>>>>> original timestamps >>>>>>>>> >>>>>>>>> >>>>>>>>> Regarding windowing: I guess that if elements are not in the same >>>>>>>>> window, >>>>>>>>> they are not expected to be in the same batch. >>>>>>>>> I'm just starting to work on these subjects, so I might lack a bit >>>>>> >>>>>> of >>>>>>>>> >>>>>>>>> information; >>>>>>>>> what I am currently thinking about is that I need a way to know in >>>>>> >>>>>> the >>>>>>>>> >>>>>>>>> DoFn that the window has expired so that I can call the perBatchFn >>>>>>>>> even if >>>>>>>>> batchSize is not reached. This is the @OnWindowExpiration callback >>>>>>>>> that >>>>>>>>> Kenneth mentioned in an email about bundles. >>>>>>>>> Lets imagine that we have a collection of elements artificially >>>>>>>>> timestamped every 10 seconds (for simplicity of the example) and a >>>>>>>>> fixed >>>>>>>>> windowing of 1 minute. Then each window contains 6 elements. If we >>>>>>>>> were to >>>>>>>>> buffer the elements by batches of 5 elements, then for each window >>>>>> >>>>>> we >>>>>>>>> >>>>>>>>> expect to get 2 batches (one of 5 elements, one of 1 element). For >>>>>>>>> that to >>>>>>>>> append, we need a @OnWindowExpiration on the DoFn where we call >>>>>>>>> perBatchFn >>>>>>>>> >>>>>>>>> As a composite transform this will likely require a group by key >>>>>>>>> which may >>>>>>>>>> >>>>>>>>>> affect performance. Maybe within a dofn is better. >>>>>>>>>> >>>>>>>>> Yes, the processing is done with a DoFn indeed. >>>>>>>>> >>>>>>>>>> Then it could be some annotation or API that informs the runner. >>>>>>>>>> Should >>>>>>>>>> batch sizes be fixed in the annotation (element count or size) or >>>>>>>>>> should >>>>>>>>>> the user have some method that lets them decide when to process a >>>>>>>>>> batch >>>>>>>>>> based on the contents? >>>>>>>>>> >>>>>>>>> For now, the user passes batchSize as an argument to >>>>>>>>> BatchParDo.via() it >>>>>>>>> is a number of elements. But batch based on content might be useful >>>>>>>>> for the >>>>>>>>> user. Give hint to the runner might be more flexible for the >>>>>> >>>>>> runner. >>>>>>>>> >>>>>>>>> Thanks. >>>>>>>>> >>>>>>>>>> Another thing to think about is whether this should be connected >>>>>> >>>>>> to >>>>>>>>>> >>>>>>>>>> the >>>>>>>>>> ability to run parts of the bundle in parallel. >>>>>>>>>> >>>>>>>>> Yes! >>>>>>>>> >>>>>>>>>> Maybe each batch is an RPC >>>>>>>>>> and you just want to start an async RPC for each batch. Then in >>>>>>>>>> addition >>>>>>>>>> to >>>>>>>>>> start the final RPC in finishBundle, you also need to wait for all >>>>>> >>>>>> the >>>>>>>>>> >>>>>>>>>> RPCs >>>>>>>>>> to complete. >>>>>>>>>> >>>>>>>>> Actually, currently each batch processing is whatever the user >>>>>> >>>>>> wants >>>>>>>>> >>>>>>>>> (perBatchFn user defined function). If the user decides to issue an >>>>>>>>> async >>>>>>>>> RPC in that function (call with the arrayList of input elements), >>>>>>>>> IMHO he >>>>>>>>> is responsible for waiting for the response in that method if he >>>>>>>>> needs the >>>>>>>>> response, but he can also do a send and forget, depending on his >>>>>> >>>>>> use >>>>>>>>> >>>>>>>>> case. >>>>>>>>> >>>>>>>>> Besides, I have also included a perElementFn user function to allow >>>>>> >>>>>> the >>>>>>>>> >>>>>>>>> user to do some processing on the elements before adding them to >>>>>> >>>>>> the >>>>>>>>> >>>>>>>>> batch >>>>>>>>> (example use case: convert a String to a DTO object to invoke an >>>>>>>>> external >>>>>>>>> service) >>>>>>>>> >>>>>>>>> Etienne >>>>>>>>> >>>>>>>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<[email protected]> >>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi JB, >>>>>>>>>> >>>>>>>>>> I meant jira vote but discussion on the ML works also :) >>>>>>>>>> >>>>>>>>>> As I understand the need (see stackoverflow links in jira ticket) >>>>>> >>>>>> the >>>>>>>>>> >>>>>>>>>> aim is to avoid the user having to code the batching logic in his >>>>>> >>>>>> own >>>>>>>>>> >>>>>>>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the >>>>>>>>>> bundles. >>>>>>>>>> For example, possible use case is to batch a call to an external >>>>>>>>>> service >>>>>>>>>> (for performance). >>>>>>>>>> >>>>>>>>>> I was thinking about providing a PTransform that implements the >>>>>>>>>> batching >>>>>>>>>> in its own DoFn and that takes user defined functions for >>>>>>>>>> customization. >>>>>>>>>> >>>>>>>>>> Etienne >>>>>>>>>> >>>>>>>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit : >>>>>>>>>> >>>>>>>>>>> Hi >>>>>>>>>>> >>>>>>>>>>> I guess you mean discussion on the mailing list about that, right >>>>>> >>>>>> ? >>>>>>>>>>> >>>>>>>>>>> AFAIR the idea is to provide a utility class to deal with >>>>>>>>>>> >>>>>>>>>> pooling/batching. However not sure it's required as with >>>>>>>>>> @StartBundle etc >>>>>>>>>> in DoFn and batching depends of the end user "logic". >>>>>>>>>> >>>>>>>>>>> Regards >>>>>>>>>>> JB >>>>>>>>>>> >>>>>>>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne >>>>>>>>>>> Chauchot<[email protected]> >>>>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi all, >>>>>>>>>>>> >>>>>>>>>>>> I have started to work on this ticket >>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-135 >>>>>>>>>>>> >>>>>>>>>>>> As there where no vote since March 18th, is the issue still >>>>>>>>>>>> relevant/needed? >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> >>>>>>>>>>>> Etienne >>>>>>>>>>>> >>> >> >
