⁣Hi Eugene

A simple way would be to create a BatchedDoFn in an extension.

WDYT ?

Regards
JB

On Jan 26, 2017, 21:48, at 21:48, Eugene Kirpichov 
<kirpic...@google.com.INVALID> wrote:
>I don't think we should make batching a core feature of the Beam
>programming model (by adding it to DoFn as this code snippet implies).
>I'm
>reasonably sure there are less invasive ways of implementing it.
>On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré <j...@nanthrax.net>
>wrote:
>
>> Agree, I'm curious as well.
>>
>> I guess it would be something like:
>>
>> .apply(ParDo(new DoFn() {
>>
>>     @Override
>>     public long batchSize() {
>>       return 1000;
>>     }
>>
>>     @ProcessElement
>>     public void processElement(ProcessContext context) {
>>       ...
>>     }
>> }));
>>
>> If batchSize (overrided by user) returns a positive long, then DoFn
>can
>> batch with this size.
>>
>> Regards
>> JB
>>
>> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
>> > Hi Etienne,
>> >
>> > Could you post some snippets of how your transform is to be used in
>a
>> > pipeline? I think that would make it easier to discuss on this
>thread and
>> > could save a lot of churn if the discussion ends up leading to a
>> different
>> > API.
>> >
>> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot
><echauc...@gmail.com>
>> > wrote:
>> >
>> >> Wonderful !
>> >>
>> >> Thanks Kenn !
>> >>
>> >> Etienne
>> >>
>> >>
>> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>> >>> Hi Etienne,
>> >>>
>> >>> I was drafting a proposal about @OnWindowExpiration when this
>email
>> >>> arrived. I thought I would try to quickly unblock you by
>responding
>> with
>> >> a
>> >>> TL;DR: you can achieve your goals with state & timers as they
>currently
>> >>> exist. You'll set a timer for
>> window.maxTimestamp().plus(allowedLateness)
>> >>> precisely - when this timer fires, you are guaranteed that the
>input
>> >>> watermark has exceeded this point (so all new data is droppable)
>while
>> >> the
>> >>> output timestamp is held to this point (so you can safely output
>into
>> the
>> >>> window).
>> >>>
>> >>> @OnWindowExpiration is (1) a convenience to save you from needing
>a
>> >> handle
>> >>> on the allowed lateness (not a problem in your case) and (2)
>actually
>> >>> meaningful and potentially less expensive to implement in the
>absence
>> of
>> >>> state (this is why it needs a design discussion at all, really).
>> >>>
>> >>> Caveat: these APIs are new and not supported in every runner and
>> >> windowing
>> >>> configuration.
>> >>>
>> >>> Kenn
>> >>>
>> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot
><echauc...@gmail.com
>> >
>> >>> wrote:
>> >>>
>> >>>> Hi,
>> >>>>
>> >>>> I have started to implement this ticket. For now it is
>implemented as
>> a
>> >>>> PTransform that simply does ParDo.of(new DoFn) and all the
>processing
>> >>>> related to batching is done in the DoFn.
>> >>>>
>> >>>> I'm starting to deal with windows and bundles (starting to take
>a look
>> >> at
>> >>>> the State API to process trans-bundles, more questions about
>this to
>> >> come).
>> >>>> My comments/questions are inline:
>> >>>>
>> >>>>
>> >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>> >>>>
>> >>>>> We should start by understanding the goals. If elements are in
>> >> different
>> >>>>> windows can they be out in the same batch? If they have
>different
>> >>>>> timestamps what timestamp should the batch have?
>> >>>>>
>> >>>> Regarding timestamps: currently design is as so: the transform
>does
>> not
>> >>>> group elements in the PCollection, so the "batch" does not exist
>as an
>> >>>> element in the PCollection. There is only a user defined
>function
>> >>>> (perBatchFn) that gets called when batchSize elements have been
>> >> processed.
>> >>>> This function takes an ArrayList as parameter. So elements keep
>their
>> >>>> original timestamps
>> >>>>
>> >>>>
>> >>>> Regarding windowing: I guess that if elements are not in the
>same
>> >> window,
>> >>>> they are not expected to be in the same batch.
>> >>>> I'm just starting to work on these subjects, so I might lack a
>bit of
>> >>>> information;
>> >>>> what I am currently thinking about is that I need a way to know
>in the
>> >>>> DoFn that the window has expired so that I can call the
>perBatchFn
>> even
>> >> if
>> >>>> batchSize is not reached.  This is the @OnWindowExpiration
>callback
>> that
>> >>>> Kenneth mentioned in an email about bundles.
>> >>>> Lets imagine that we have a collection of elements artificially
>> >>>> timestamped every 10 seconds (for simplicity of the example) and
>a
>> fixed
>> >>>> windowing of 1 minute. Then each window contains 6 elements. If
>we
>> were
>> >> to
>> >>>> buffer the elements by batches of 5 elements, then for each
>window we
>> >>>> expect to get 2 batches (one of 5 elements, one of 1 element).
>For
>> that
>> >> to
>> >>>> append, we need a @OnWindowExpiration on the DoFn where we call
>> >> perBatchFn
>> >>>>
>> >>>> As a composite transform this will likely require a group by key
>which
>> >> may
>> >>>>> affect performance. Maybe within a dofn is better.
>> >>>>>
>> >>>> Yes, the processing is done with a DoFn indeed.
>> >>>>
>> >>>>> Then it could be some annotation or API that informs the
>runner.
>> Should
>> >>>>> batch sizes be fixed in the annotation (element count or size)
>or
>> >> should
>> >>>>> the user have some method that lets them decide when to process
>a
>> batch
>> >>>>> based on the contents?
>> >>>>>
>> >>>> For now, the user passes batchSize as an argument to
>BatchParDo.via()
>> it
>> >>>> is a number of elements. But batch based on content might be
>useful
>> for
>> >> the
>> >>>> user. Give hint to the runner might be more flexible for the
>runner.
>> >> Thanks.
>> >>>>
>> >>>>> Another thing to think about is whether this should be
>connected to
>> the
>> >>>>> ability to run parts of the bundle in parallel.
>> >>>>>
>> >>>> Yes!
>> >>>>
>> >>>>> Maybe each batch is an RPC
>> >>>>> and you just want to start an async RPC for each batch. Then in
>> >> addition
>> >>>>> to
>> >>>>> start the final RPC in finishBundle, you also need to wait for
>all
>> the
>> >>>>> RPCs
>> >>>>> to complete.
>> >>>>>
>> >>>> Actually, currently each batch processing is whatever the user
>wants
>> >>>> (perBatchFn user defined function). If the user decides to issue
>an
>> >> async
>> >>>> RPC in that function (call with the arrayList of input
>elements), IMHO
>> >> he
>> >>>> is responsible for waiting for the response in that method if he
>needs
>> >> the
>> >>>> response, but he can also do a send and forget, depending on his
>use
>> >> case.
>> >>>>
>> >>>> Besides, I have also included a perElementFn user function to
>allow
>> the
>> >>>> user to do some processing on the elements before adding them to
>the
>> >> batch
>> >>>> (example use case: convert a String to a DTO object to invoke an
>> >> external
>> >>>> service)
>> >>>>
>> >>>> Etienne
>> >>>>
>> >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne
>Chauchot<echauc...@gmail.com>
>> >>>>> wrote:
>> >>>>>
>> >>>>> Hi JB,
>> >>>>>
>> >>>>> I meant jira vote but discussion on the ML works also :)
>> >>>>>
>> >>>>> As I understand the need (see stackoverflow links in jira
>ticket) the
>> >>>>> aim is to avoid the user having to code the batching logic in
>his own
>> >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the
>> >> bundles.
>> >>>>> For example, possible use case is to batch a call to an
>external
>> >> service
>> >>>>> (for performance).
>> >>>>>
>> >>>>> I was thinking about providing a PTransform that implements the
>> >> batching
>> >>>>> in its own DoFn and that takes user defined functions for
>> >> customization.
>> >>>>>
>> >>>>> Etienne
>> >>>>>
>> >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
>> >>>>>
>> >>>>>> Hi
>> >>>>>>
>> >>>>>> I guess you mean discussion on the mailing list about that,
>right ?
>> >>>>>>
>> >>>>>> AFAIR the ide⁣​a is to provide a utility class to deal with
>> >>>>>>
>> >>>>> pooling/batching. However not sure it's required as with
>@StartBundle
>> >> etc
>> >>>>> in DoFn and batching depends of the end user "logic".
>> >>>>>
>> >>>>>> Regards
>> >>>>>> JB
>> >>>>>>
>> >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot<
>> >> echauc...@gmail.com>
>> >>>>>>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Hi all,
>> >>>>>>> I have started to work on this ticket
>> >>>>>>> https://issues.apache.org/jira/browse/BEAM-135
>> >>>>>>>
>> >>>>>>> As there where no vote since March 18th, is the issue still
>> >>>>>>> relevant/needed?
>> >>>>>>>
>> >>>>>>> Regards,
>> >>>>>>>
>> >>>>>>> Etienne
>> >>>>>>>
>> >>
>> >>
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> jbono...@apache.org
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>

Reply via email to