Sorry, just saw https://github.com/apache/beam/pull/2211

On Mon, Jul 10, 2017 at 5:37 PM, Robert Bradshaw <rober...@google.com> wrote:
> Any progress on this?
>
> On Thu, Mar 9, 2017 at 1:43 AM, Etienne Chauchot <echauc...@gmail.com> wrote:
>> Hi all,
>>
>> We had a discussion with Kenn yesterday about point 1 bellow, I would like
>> to note it here on the ML:
>>
>> Using new method timer.set() instead of timer.setForNowPlus() makes the
>> timer fire at the right time.
>>
>> Another thing, regarding point 2: if I inject the window in the @Ontimer
>> method and print it, I see that at the moment the timer fires (at last
>> timestamp of the window), the window is the GlobalWindow. I guess that is
>> because the fixed window has just ended. Maybe the empty bagState that I get
>> here is due to the end of window (passing to the GlobalWindow). I mean, as
>> the states are scoped per window, and the window is different, then another
>> bagState instance gets injected. Hence the empty bagState
>>
>> WDYT?
>>
>> I will open a PR even if this work is not finished yet, that way, we will
>> have a convenient environment for discussing this code.
>>
>> Etienne
>>
>>
>> Le 03/03/2017 à 11:48, Etienne Chauchot a écrit :
>>>
>>> Hi all,
>>>
>>> @Kenn: I have enhanced my streaming test in
>>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO in particular
>>> to check that BatchingParDo doesn't mess up windows. It seems that it
>>> actually does :)
>>>
>>> The input collection contains 10 elements timestamped at 1s interval and
>>> it is divided into fixed windows of 5s duration (so 2 windows). startTime is
>>> epoch. The timer is used to detect the end of the window and output the
>>> content of the batch (buffer) then.
>>>
>>> I added some logs and I noticed two strange things (that might be linked):
>>>
>>>
>>> 1-The timer is set twice, and it is set correctly
>>>
>>> INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp
>>> 1970-01-01T00:00:00.000Z set for window
>>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>>
>>> INFOS: ***** SET TIMER ***** Delay of 4999 ms added to timestamp
>>> 1970-01-01T00:00:05.000Z set for window
>>> [1970-01-01T00:00:05.000Z..1970-01-01T00:00:10.000Z)
>>>
>>> It correctly fires twice but not at the right timeStamp:
>>>
>>> INFOS: ***** END OF WINDOW ***** for timer timestamp
>>> 1970-01-01T00:00:04.999Z
>>>
>>> =>Correct
>>>
>>> INFOS: ***** END OF WINDOW ***** for timer timestamp
>>> 1970-01-01T00:00:04.999Z
>>>
>>> => Incorrect (should fire at timestamp 1970-01-01T00:00:09.999Z)
>>>
>>> Do I need to call timer.cancel() after the timer has fired ? But
>>> timer.cancel() is not supported by DirectRunner yet.
>>>
>>>
>>>
>>> 2- in @OnTimer method the injected batch bagState parameter is empty
>>> whereas it was added some elements since last batch.clear() while processing
>>> the same window
>>>
>>> INFOS: ***** BATCH ***** clear
>>>
>>> INFOS: ***** BATCH ***** Add element for window
>>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>>
>>> INFOS: ***** BATCH ***** Add element for window
>>> [1970-01-01T00:00:00.000Z..1970-01-01T00:00:05.000Z)
>>> ..
>>> INFOS: ***** END OF WINDOW ***** for timer timestamp
>>> 1970-01-01T00:00:04.999Z
>>> INFOS: ***** IN ONTIMER ***** batch size 0
>>>
>>> Am I doing something wrong with timers or is there something not totally
>>> finished with them (as you noticed they are quite new)?
>>>
>>> WDYT?
>>>
>>>
>>> Thanks
>>>
>>> Etienne
>>>
>>>
>>> Le 09/02/2017 à 09:55, Etienne Chauchot a écrit :
>>>>
>>>> Hi,
>>>>
>>>> @JB: good to know for the roadmap! thanks
>>>>
>>>> @Kenn: just to be clear: the timer fires fine. What I noticed is that it
>>>> seems to be SET more than once because timer.setForNowPlus in called the
>>>> @ProcessElement method. I am not 100% sure of it, what I noticed is that it
>>>> started to work fine when I ensured to call timer.setForNowPlus only once. 
>>>> I
>>>> don't say it's a bug, this is just not what I understood when I read the
>>>> javadoc, I understood that it would be set  only once per window, see
>>>> javadoc bellow:
>>>>
>>>> An implementation of Timer is implicitly scoped - it may be scoped to a
>>>> key and window, or a key, window, and trigger, etc.
>>>> A timer exists in one of two states: set or unset. A timer can be set
>>>> only for a single time per scope.
>>>>
>>>> I use the DirectRunner.
>>>>
>>>> For the key part: ok, makes sense.
>>>>
>>>> Thanks for your comments
>>>>
>>>> I'm leaving on vacation tonight for a little more than two weeks, I'll
>>>> resume work then, maybe start a PR when it's ready.
>>>>
>>>> Etienne
>>>>
>>>>
>>>>
>>>> Le 08/02/2017 à 19:48, Kenneth Knowles a écrit :
>>>>>
>>>>> Hi Etienne,
>>>>>
>>>>> If the timer is firing n times for n elements, that's a bug in the
>>>>> runner /
>>>>> shared runner code. It should be deduped. Which runner? Can you file a
>>>>> JIRA
>>>>> against me to investigate? I'm still in the process of fleshing out more
>>>>> and more RunnableOnService (aka ValidatesRunner) tests so I will surely
>>>>> add
>>>>> one (existing tests already OOMed without deduping, so it wasn't at the
>>>>> top
>>>>> of my priority list)
>>>>>
>>>>> If the end user doesn't have a natural key, I would just add one and
>>>>> remove
>>>>> it within your transform. Not sure how easy this will be - you might
>>>>> need
>>>>> user intervention. Of course, you still do need to shard or you'll be
>>>>> processing the whole PCollection serially.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Feb 8, 2017 at 9:45 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
>>>>> wrote:
>>>>>
>>>>>> Hi
>>>>>>
>>>>>> AFAIR the timer per function is in the "roadmap" (remembering
>>>>>> discussion
>>>>>> we had with Kenn).
>>>>>>
>>>>>> I will take a deeper look next week on your branch.
>>>>>>
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>> On Feb 8, 2017, 13:28, at 13:28, Etienne Chauchot <echauc...@gmail.com>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi Kenn,
>>>>>>>
>>>>>>> I have started using state and timer APIs, they seem awesome!
>>>>>>>
>>>>>>> Please take a look at
>>>>>>> https://github.com/echauchot/beam/tree/BEAM-135-BATCHING-PARDO
>>>>>>>
>>>>>>> It contains a PTransform that does the batching trans-bundles and
>>>>>>> respecting the windows (even if tests are not finished yet, see
>>>>>>> @Ignore
>>>>>>>
>>>>>>> and TODOs)
>>>>>>>
>>>>>>>   I have some questions:
>>>>>>>
>>>>>>> - I use the timer to detect the end of the window like you suggested.
>>>>>>> But the timer can only be set in @ProcessElement and @Ontimer. Javadoc
>>>>>>> says that timers are implicitly scoped to a key/window and that a
>>>>>>> timer
>>>>>>>
>>>>>>> can be set only for a single time per scope. I noticed that if I call
>>>>>>> timer.setForNowPlus in the @ProcessElement method, it seems that the
>>>>>>> timer is set n times for n elements. So I just created a state with
>>>>>>> boolean to prevent setting the timer more than once per key/window.
>>>>>>> => Would it be good maybe to have a end user way of indicating that
>>>>>>> the
>>>>>>>
>>>>>>> timer will be set only once per key/window. Something analogous to
>>>>>>> @Setup, to avoid the user having to use a state boolean?
>>>>>>>
>>>>>>> - I understand that state and timers need to be per-key, but if the
>>>>>>> end
>>>>>>>
>>>>>>> user does not need a key (lets say he just needs a
>>>>>>> PCollection<String>).
>>>>>>> Then, do we tell him to use a PCollection<KV> anyway like I wrote in
>>>>>>> the
>>>>>>> javadoc of BatchingParDo?
>>>>>>>
>>>>>>> WDYT?
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Etienne
>>>>>>>
>>>>>>>
>>>>>>> Le 26/01/2017 à 17:28, Etienne Chauchot a écrit :
>>>>>>>>
>>>>>>>> Wonderful !
>>>>>>>>
>>>>>>>> Thanks Kenn !
>>>>>>>>
>>>>>>>> Etienne
>>>>>>>>
>>>>>>>>
>>>>>>>> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>>>>>>>>>
>>>>>>>>> Hi Etienne,
>>>>>>>>>
>>>>>>>>> I was drafting a proposal about @OnWindowExpiration when this email
>>>>>>>>> arrived. I thought I would try to quickly unblock you by responding
>>>>>>>>> with a
>>>>>>>>> TL;DR: you can achieve your goals with state & timers as they
>>>>>>>
>>>>>>> currently
>>>>>>>>>
>>>>>>>>> exist. You'll set a timer for
>>>>>>>>> window.maxTimestamp().plus(allowedLateness)
>>>>>>>>> precisely - when this timer fires, you are guaranteed that the input
>>>>>>>>> watermark has exceeded this point (so all new data is droppable)
>>>>>>>>> while the
>>>>>>>>> output timestamp is held to this point (so you can safely output
>>>>>>>
>>>>>>> into
>>>>>>>>>
>>>>>>>>> the
>>>>>>>>> window).
>>>>>>>>>
>>>>>>>>> @OnWindowExpiration is (1) a convenience to save you from needing a
>>>>>>>>> handle
>>>>>>>>> on the allowed lateness (not a problem in your case) and (2)
>>>>>>>
>>>>>>> actually
>>>>>>>>>
>>>>>>>>> meaningful and potentially less expensive to implement in the
>>>>>>>
>>>>>>> absence of
>>>>>>>>>
>>>>>>>>> state (this is why it needs a design discussion at all, really).
>>>>>>>>>
>>>>>>>>> Caveat: these APIs are new and not supported in every runner and
>>>>>>>>> windowing
>>>>>>>>> configuration.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
>>>>>>>
>>>>>>> <echauc...@gmail.com>
>>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I have started to implement this ticket. For now it is implemented
>>>>>>>
>>>>>>> as a
>>>>>>>>>>
>>>>>>>>>> PTransform that simply does ParDo.of(new DoFn) and all the
>>>>>>>
>>>>>>> processing
>>>>>>>>>>
>>>>>>>>>> related to batching is done in the DoFn.
>>>>>>>>>>
>>>>>>>>>> I'm starting to deal with windows and bundles (starting to take a
>>>>>>>>>> look at
>>>>>>>>>> the State API to process trans-bundles, more questions about this
>>>>>>>
>>>>>>> to
>>>>>>>>>>
>>>>>>>>>> come).
>>>>>>>>>> My comments/questions are inline:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>>>>>>>>>>
>>>>>>>>>>> We should start by understanding the goals. If elements are in
>>>>>>>>>>> different
>>>>>>>>>>> windows can they be out in the same batch? If they have different
>>>>>>>>>>> timestamps what timestamp should the batch have?
>>>>>>>>>>>
>>>>>>>>>> Regarding timestamps: currently design is as so: the transform does
>>>>>>>
>>>>>>> not
>>>>>>>>>>
>>>>>>>>>> group elements in the PCollection, so the "batch" does not exist as
>>>>>>>
>>>>>>> an
>>>>>>>>>>
>>>>>>>>>> element in the PCollection. There is only a user defined function
>>>>>>>>>> (perBatchFn) that gets called when batchSize elements have been
>>>>>>>>>> processed.
>>>>>>>>>> This function takes an ArrayList as parameter. So elements keep
>>>>>>>
>>>>>>> their
>>>>>>>>>>
>>>>>>>>>> original timestamps
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Regarding windowing: I guess that if elements are not in the same
>>>>>>>>>> window,
>>>>>>>>>> they are not expected to be in the same batch.
>>>>>>>>>> I'm just starting to work on these subjects, so I might lack a bit
>>>>>>>
>>>>>>> of
>>>>>>>>>>
>>>>>>>>>> information;
>>>>>>>>>> what I am currently thinking about is that I need a way to know in
>>>>>>>
>>>>>>> the
>>>>>>>>>>
>>>>>>>>>> DoFn that the window has expired so that I can call the perBatchFn
>>>>>>>>>> even if
>>>>>>>>>> batchSize is not reached.  This is the @OnWindowExpiration callback
>>>>>>>>>> that
>>>>>>>>>> Kenneth mentioned in an email about bundles.
>>>>>>>>>> Lets imagine that we have a collection of elements artificially
>>>>>>>>>> timestamped every 10 seconds (for simplicity of the example) and a
>>>>>>>>>> fixed
>>>>>>>>>> windowing of 1 minute. Then each window contains 6 elements. If we
>>>>>>>>>> were to
>>>>>>>>>> buffer the elements by batches of 5 elements, then for each window
>>>>>>>
>>>>>>> we
>>>>>>>>>>
>>>>>>>>>> expect to get 2 batches (one of 5 elements, one of 1 element). For
>>>>>>>>>> that to
>>>>>>>>>> append, we need a @OnWindowExpiration on the DoFn where we call
>>>>>>>>>> perBatchFn
>>>>>>>>>>
>>>>>>>>>> As a composite transform this will likely require a group by key
>>>>>>>>>> which may
>>>>>>>>>>>
>>>>>>>>>>> affect performance. Maybe within a dofn is better.
>>>>>>>>>>>
>>>>>>>>>> Yes, the processing is done with a DoFn indeed.
>>>>>>>>>>
>>>>>>>>>>> Then it could be some annotation or API that informs the runner.
>>>>>>>>>>> Should
>>>>>>>>>>> batch sizes be fixed in the annotation (element count or size) or
>>>>>>>>>>> should
>>>>>>>>>>> the user have some method that lets them decide when to process a
>>>>>>>>>>> batch
>>>>>>>>>>> based on the contents?
>>>>>>>>>>>
>>>>>>>>>> For now, the user passes batchSize as an argument to
>>>>>>>>>> BatchParDo.via() it
>>>>>>>>>> is a number of elements. But batch based on content might be useful
>>>>>>>>>> for the
>>>>>>>>>> user. Give hint to the runner might be more flexible for the
>>>>>>>
>>>>>>> runner.
>>>>>>>>>>
>>>>>>>>>> Thanks.
>>>>>>>>>>
>>>>>>>>>>> Another thing to think about is whether this should be connected
>>>>>>>
>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>>> the
>>>>>>>>>>> ability to run parts of the bundle in parallel.
>>>>>>>>>>>
>>>>>>>>>> Yes!
>>>>>>>>>>
>>>>>>>>>>> Maybe each batch is an RPC
>>>>>>>>>>> and you just want to start an async RPC for each batch. Then in
>>>>>>>>>>> addition
>>>>>>>>>>> to
>>>>>>>>>>> start the final RPC in finishBundle, you also need to wait for all
>>>>>>>
>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>> RPCs
>>>>>>>>>>> to complete.
>>>>>>>>>>>
>>>>>>>>>> Actually, currently each batch processing is whatever the user
>>>>>>>
>>>>>>> wants
>>>>>>>>>>
>>>>>>>>>> (perBatchFn user defined function). If the user decides to issue an
>>>>>>>>>> async
>>>>>>>>>> RPC in that function (call with the arrayList of input elements),
>>>>>>>>>> IMHO he
>>>>>>>>>> is responsible for waiting for the response in that method if he
>>>>>>>>>> needs the
>>>>>>>>>> response, but he can also do a send and forget, depending on his
>>>>>>>
>>>>>>> use
>>>>>>>>>>
>>>>>>>>>> case.
>>>>>>>>>>
>>>>>>>>>> Besides, I have also included a perElementFn user function to allow
>>>>>>>
>>>>>>> the
>>>>>>>>>>
>>>>>>>>>> user to do some processing on the elements before adding them to
>>>>>>>
>>>>>>> the
>>>>>>>>>>
>>>>>>>>>> batch
>>>>>>>>>> (example use case: convert a String to a DTO object to invoke an
>>>>>>>>>> external
>>>>>>>>>> service)
>>>>>>>>>>
>>>>>>>>>> Etienne
>>>>>>>>>>
>>>>>>>>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com>
>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi JB,
>>>>>>>>>>>
>>>>>>>>>>> I meant jira vote but discussion on the ML works also :)
>>>>>>>>>>>
>>>>>>>>>>> As I understand the need (see stackoverflow links in jira ticket)
>>>>>>>
>>>>>>> the
>>>>>>>>>>>
>>>>>>>>>>> aim is to avoid the user having to code the batching logic in his
>>>>>>>
>>>>>>> own
>>>>>>>>>>>
>>>>>>>>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the
>>>>>>>>>>> bundles.
>>>>>>>>>>> For example, possible use case is to batch a call to an external
>>>>>>>>>>> service
>>>>>>>>>>> (for performance).
>>>>>>>>>>>
>>>>>>>>>>> I was thinking about providing a PTransform that implements the
>>>>>>>>>>> batching
>>>>>>>>>>> in its own DoFn and that takes user defined functions for
>>>>>>>>>>> customization.
>>>>>>>>>>>
>>>>>>>>>>> Etienne
>>>>>>>>>>>
>>>>>>>>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
>>>>>>>>>>>
>>>>>>>>>>>> Hi
>>>>>>>>>>>>
>>>>>>>>>>>> I guess you mean discussion on the mailing list about that, right
>>>>>>>
>>>>>>> ?
>>>>>>>>>>>>
>>>>>>>>>>>> AFAIR the ide⁣a is to provide a utility class to deal with
>>>>>>>>>>>>
>>>>>>>>>>> pooling/batching. However not sure it's required as with
>>>>>>>>>>> @StartBundle etc
>>>>>>>>>>> in DoFn and batching depends of the end user "logic".
>>>>>>>>>>>
>>>>>>>>>>>> Regards
>>>>>>>>>>>> JB
>>>>>>>>>>>>
>>>>>>>>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne
>>>>>>>>>>>> Chauchot<echauc...@gmail.com>
>>>>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have started to work on this ticket
>>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-135
>>>>>>>>>>>>>
>>>>>>>>>>>>> As there where no vote since March 18th, is the issue still
>>>>>>>>>>>>> relevant/needed?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Etienne
>>>>>>>>>>>>>
>>>>
>>>
>>

Reply via email to