On Thu, Jan 26, 2017 at 3:31 PM, Ben Chambers
<bchamb...@google.com.invalid> wrote:
> I think that wrapping the DoFn is tricky -- we backed out
> IntraBundleParallelization because it did that, and it has weird
> interactions with both the reflective DoFn and windowing. We could maybe
> make some kind of "DoFnDelegatingDoFn" that could act as a base class and
> get some of that right, but...

Yeah, this is a lot trickier with NewDoFn. Which is unfortunate as
this isn't the only case where we want to make DoFns more compossible.

> One question I have is whether this batching should be "make batches of N
> and if you need to wait for the Nth element do so" or "make batches of at
> most N but don't wait too long if you don't get to N". In the former case,
> we'll need to do something to buffer elements between bundles -- whether
> this is using State or a GroupByKey, etc. In the latter case, the buffering
> can happen entirely within a bundle -- if you get to the end of the bundle
> and only have 5 elements, even if 5 < N, process that as a batch (rather
> than shifting it somewhere else).

I think the "make batches of at most N but don't wait too long if you
don't get to N" is a very useful first (and tractable) start that can
be built on.

> On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw <rober...@google.com.invalid>
> wrote:
>
>> On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov
>> <kirpic...@google.com.invalid> wrote:
>> > I don't think we should make batching a core feature of the Beam
>> > programming model (by adding it to DoFn as this code snippet implies).
>> I'm
>> > reasonably sure there are less invasive ways of implementing it.
>>
>> +1, either as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O> that
>> wraps/delegates to a DoFn<Iterable<T>, Iterable<O>>.
>>
>> > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré <j...@nanthrax.net>
>> > wrote:
>> >
>> >> Agree, I'm curious as well.
>> >>
>> >> I guess it would be something like:
>> >>
>> >> .apply(ParDo(new DoFn() {
>> >>
>> >>     @Override
>> >>     public long batchSize() {
>> >>       return 1000;
>> >>     }
>> >>
>> >>     @ProcessElement
>> >>     public void processElement(ProcessContext context) {
>> >>       ...
>> >>     }
>> >> }));
>> >>
>> >> If batchSize (overrided by user) returns a positive long, then DoFn can
>> >> batch with this size.
>> >>
>> >> Regards
>> >> JB
>> >>
>> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote:
>> >> > Hi Etienne,
>> >> >
>> >> > Could you post some snippets of how your transform is to be used in a
>> >> > pipeline? I think that would make it easier to discuss on this thread
>> and
>> >> > could save a lot of churn if the discussion ends up leading to a
>> >> different
>> >> > API.
>> >> >
>> >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <echauc...@gmail.com
>> >
>> >> > wrote:
>> >> >
>> >> >> Wonderful !
>> >> >>
>> >> >> Thanks Kenn !
>> >> >>
>> >> >> Etienne
>> >> >>
>> >> >>
>> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit :
>> >> >>> Hi Etienne,
>> >> >>>
>> >> >>> I was drafting a proposal about @OnWindowExpiration when this email
>> >> >>> arrived. I thought I would try to quickly unblock you by responding
>> >> with
>> >> >> a
>> >> >>> TL;DR: you can achieve your goals with state & timers as they
>> currently
>> >> >>> exist. You'll set a timer for
>> >> window.maxTimestamp().plus(allowedLateness)
>> >> >>> precisely - when this timer fires, you are guaranteed that the input
>> >> >>> watermark has exceeded this point (so all new data is droppable)
>> while
>> >> >> the
>> >> >>> output timestamp is held to this point (so you can safely output
>> into
>> >> the
>> >> >>> window).
>> >> >>>
>> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a
>> >> >> handle
>> >> >>> on the allowed lateness (not a problem in your case) and (2)
>> actually
>> >> >>> meaningful and potentially less expensive to implement in the
>> absence
>> >> of
>> >> >>> state (this is why it needs a design discussion at all, really).
>> >> >>>
>> >> >>> Caveat: these APIs are new and not supported in every runner and
>> >> >> windowing
>> >> >>> configuration.
>> >> >>>
>> >> >>> Kenn
>> >> >>>
>> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <
>> echauc...@gmail.com
>> >> >
>> >> >>> wrote:
>> >> >>>
>> >> >>>> Hi,
>> >> >>>>
>> >> >>>> I have started to implement this ticket. For now it is implemented
>> as
>> >> a
>> >> >>>> PTransform that simply does ParDo.of(new DoFn) and all the
>> processing
>> >> >>>> related to batching is done in the DoFn.
>> >> >>>>
>> >> >>>> I'm starting to deal with windows and bundles (starting to take a
>> look
>> >> >> at
>> >> >>>> the State API to process trans-bundles, more questions about this
>> to
>> >> >> come).
>> >> >>>> My comments/questions are inline:
>> >> >>>>
>> >> >>>>
>> >> >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>> >> >>>>
>> >> >>>>> We should start by understanding the goals. If elements are in
>> >> >> different
>> >> >>>>> windows can they be out in the same batch? If they have different
>> >> >>>>> timestamps what timestamp should the batch have?
>> >> >>>>>
>> >> >>>> Regarding timestamps: currently design is as so: the transform does
>> >> not
>> >> >>>> group elements in the PCollection, so the "batch" does not exist
>> as an
>> >> >>>> element in the PCollection. There is only a user defined function
>> >> >>>> (perBatchFn) that gets called when batchSize elements have been
>> >> >> processed.
>> >> >>>> This function takes an ArrayList as parameter. So elements keep
>> their
>> >> >>>> original timestamps
>> >> >>>>
>> >> >>>>
>> >> >>>> Regarding windowing: I guess that if elements are not in the same
>> >> >> window,
>> >> >>>> they are not expected to be in the same batch.
>> >> >>>> I'm just starting to work on these subjects, so I might lack a bit
>> of
>> >> >>>> information;
>> >> >>>> what I am currently thinking about is that I need a way to know in
>> the
>> >> >>>> DoFn that the window has expired so that I can call the perBatchFn
>> >> even
>> >> >> if
>> >> >>>> batchSize is not reached.  This is the @OnWindowExpiration callback
>> >> that
>> >> >>>> Kenneth mentioned in an email about bundles.
>> >> >>>> Lets imagine that we have a collection of elements artificially
>> >> >>>> timestamped every 10 seconds (for simplicity of the example) and a
>> >> fixed
>> >> >>>> windowing of 1 minute. Then each window contains 6 elements. If we
>> >> were
>> >> >> to
>> >> >>>> buffer the elements by batches of 5 elements, then for each window
>> we
>> >> >>>> expect to get 2 batches (one of 5 elements, one of 1 element). For
>> >> that
>> >> >> to
>> >> >>>> append, we need a @OnWindowExpiration on the DoFn where we call
>> >> >> perBatchFn
>> >> >>>>
>> >> >>>> As a composite transform this will likely require a group by key
>> which
>> >> >> may
>> >> >>>>> affect performance. Maybe within a dofn is better.
>> >> >>>>>
>> >> >>>> Yes, the processing is done with a DoFn indeed.
>> >> >>>>
>> >> >>>>> Then it could be some annotation or API that informs the runner.
>> >> Should
>> >> >>>>> batch sizes be fixed in the annotation (element count or size) or
>> >> >> should
>> >> >>>>> the user have some method that lets them decide when to process a
>> >> batch
>> >> >>>>> based on the contents?
>> >> >>>>>
>> >> >>>> For now, the user passes batchSize as an argument to
>> BatchParDo.via()
>> >> it
>> >> >>>> is a number of elements. But batch based on content might be useful
>> >> for
>> >> >> the
>> >> >>>> user. Give hint to the runner might be more flexible for the
>> runner.
>> >> >> Thanks.
>> >> >>>>
>> >> >>>>> Another thing to think about is whether this should be connected
>> to
>> >> the
>> >> >>>>> ability to run parts of the bundle in parallel.
>> >> >>>>>
>> >> >>>> Yes!
>> >> >>>>
>> >> >>>>> Maybe each batch is an RPC
>> >> >>>>> and you just want to start an async RPC for each batch. Then in
>> >> >> addition
>> >> >>>>> to
>> >> >>>>> start the final RPC in finishBundle, you also need to wait for all
>> >> the
>> >> >>>>> RPCs
>> >> >>>>> to complete.
>> >> >>>>>
>> >> >>>> Actually, currently each batch processing is whatever the user
>> wants
>> >> >>>> (perBatchFn user defined function). If the user decides to issue an
>> >> >> async
>> >> >>>> RPC in that function (call with the arrayList of input elements),
>> IMHO
>> >> >> he
>> >> >>>> is responsible for waiting for the response in that method if he
>> needs
>> >> >> the
>> >> >>>> response, but he can also do a send and forget, depending on his
>> use
>> >> >> case.
>> >> >>>>
>> >> >>>> Besides, I have also included a perElementFn user function to allow
>> >> the
>> >> >>>> user to do some processing on the elements before adding them to
>> the
>> >> >> batch
>> >> >>>> (example use case: convert a String to a DTO object to invoke an
>> >> >> external
>> >> >>>> service)
>> >> >>>>
>> >> >>>> Etienne
>> >> >>>>
>> >> >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com
>> >
>> >> >>>>> wrote:
>> >> >>>>>
>> >> >>>>> Hi JB,
>> >> >>>>>
>> >> >>>>> I meant jira vote but discussion on the ML works also :)
>> >> >>>>>
>> >> >>>>> As I understand the need (see stackoverflow links in jira ticket)
>> the
>> >> >>>>> aim is to avoid the user having to code the batching logic in his
>> own
>> >> >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the
>> >> >> bundles.
>> >> >>>>> For example, possible use case is to batch a call to an external
>> >> >> service
>> >> >>>>> (for performance).
>> >> >>>>>
>> >> >>>>> I was thinking about providing a PTransform that implements the
>> >> >> batching
>> >> >>>>> in its own DoFn and that takes user defined functions for
>> >> >> customization.
>> >> >>>>>
>> >> >>>>> Etienne
>> >> >>>>>
>> >> >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit :
>> >> >>>>>
>> >> >>>>>> Hi
>> >> >>>>>>
>> >> >>>>>> I guess you mean discussion on the mailing list about that,
>> right ?
>> >> >>>>>>
>> >> >>>>>> AFAIR the ide⁣a is to provide a utility class to deal with
>> >> >>>>>>
>> >> >>>>> pooling/batching. However not sure it's required as with
>> @StartBundle
>> >> >> etc
>> >> >>>>> in DoFn and batching depends of the end user "logic".
>> >> >>>>>
>> >> >>>>>> Regards
>> >> >>>>>> JB
>> >> >>>>>>
>> >> >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot<
>> >> >> echauc...@gmail.com>
>> >> >>>>>>
>> >> >>>>> wrote:
>> >> >>>>>
>> >> >>>>>> Hi all,
>> >> >>>>>>> I have started to work on this ticket
>> >> >>>>>>> https://issues.apache.org/jira/browse/BEAM-135
>> >> >>>>>>>
>> >> >>>>>>> As there where no vote since March 18th, is the issue still
>> >> >>>>>>> relevant/needed?
>> >> >>>>>>>
>> >> >>>>>>> Regards,
>> >> >>>>>>>
>> >> >>>>>>> Etienne
>> >> >>>>>>>
>> >> >>
>> >> >>
>> >> >
>> >>
>> >> --
>> >> Jean-Baptiste Onofré
>> >> jbono...@apache.org
>> >> http://blog.nanthrax.net
>> >> Talend - http://www.talend.com
>> >>
>>

Reply via email to