On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov <kirpic...@google.com.invalid> wrote: > I don't think we should make batching a core feature of the Beam > programming model (by adding it to DoFn as this code snippet implies). I'm > reasonably sure there are less invasive ways of implementing it.
+1, either as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O> that wraps/delegates to a DoFn<Iterable<T>, Iterable<O>>. > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> Agree, I'm curious as well. >> >> I guess it would be something like: >> >> .apply(ParDo(new DoFn() { >> >> @Override >> public long batchSize() { >> return 1000; >> } >> >> @ProcessElement >> public void processElement(ProcessContext context) { >> ... >> } >> })); >> >> If batchSize (overrided by user) returns a positive long, then DoFn can >> batch with this size. >> >> Regards >> JB >> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: >> > Hi Etienne, >> > >> > Could you post some snippets of how your transform is to be used in a >> > pipeline? I think that would make it easier to discuss on this thread and >> > could save a lot of churn if the discussion ends up leading to a >> different >> > API. >> > >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <echauc...@gmail.com> >> > wrote: >> > >> >> Wonderful ! >> >> >> >> Thanks Kenn ! >> >> >> >> Etienne >> >> >> >> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : >> >>> Hi Etienne, >> >>> >> >>> I was drafting a proposal about @OnWindowExpiration when this email >> >>> arrived. I thought I would try to quickly unblock you by responding >> with >> >> a >> >>> TL;DR: you can achieve your goals with state & timers as they currently >> >>> exist. You'll set a timer for >> window.maxTimestamp().plus(allowedLateness) >> >>> precisely - when this timer fires, you are guaranteed that the input >> >>> watermark has exceeded this point (so all new data is droppable) while >> >> the >> >>> output timestamp is held to this point (so you can safely output into >> the >> >>> window). >> >>> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a >> >> handle >> >>> on the allowed lateness (not a problem in your case) and (2) actually >> >>> meaningful and potentially less expensive to implement in the absence >> of >> >>> state (this is why it needs a design discussion at all, really). >> >>> >> >>> Caveat: these APIs are new and not supported in every runner and >> >> windowing >> >>> configuration. >> >>> >> >>> Kenn >> >>> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <echauc...@gmail.com >> > >> >>> wrote: >> >>> >> >>>> Hi, >> >>>> >> >>>> I have started to implement this ticket. For now it is implemented as >> a >> >>>> PTransform that simply does ParDo.of(new DoFn) and all the processing >> >>>> related to batching is done in the DoFn. >> >>>> >> >>>> I'm starting to deal with windows and bundles (starting to take a look >> >> at >> >>>> the State API to process trans-bundles, more questions about this to >> >> come). >> >>>> My comments/questions are inline: >> >>>> >> >>>> >> >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit : >> >>>> >> >>>>> We should start by understanding the goals. If elements are in >> >> different >> >>>>> windows can they be out in the same batch? If they have different >> >>>>> timestamps what timestamp should the batch have? >> >>>>> >> >>>> Regarding timestamps: currently design is as so: the transform does >> not >> >>>> group elements in the PCollection, so the "batch" does not exist as an >> >>>> element in the PCollection. There is only a user defined function >> >>>> (perBatchFn) that gets called when batchSize elements have been >> >> processed. >> >>>> This function takes an ArrayList as parameter. So elements keep their >> >>>> original timestamps >> >>>> >> >>>> >> >>>> Regarding windowing: I guess that if elements are not in the same >> >> window, >> >>>> they are not expected to be in the same batch. >> >>>> I'm just starting to work on these subjects, so I might lack a bit of >> >>>> information; >> >>>> what I am currently thinking about is that I need a way to know in the >> >>>> DoFn that the window has expired so that I can call the perBatchFn >> even >> >> if >> >>>> batchSize is not reached. This is the @OnWindowExpiration callback >> that >> >>>> Kenneth mentioned in an email about bundles. >> >>>> Lets imagine that we have a collection of elements artificially >> >>>> timestamped every 10 seconds (for simplicity of the example) and a >> fixed >> >>>> windowing of 1 minute. Then each window contains 6 elements. If we >> were >> >> to >> >>>> buffer the elements by batches of 5 elements, then for each window we >> >>>> expect to get 2 batches (one of 5 elements, one of 1 element). For >> that >> >> to >> >>>> append, we need a @OnWindowExpiration on the DoFn where we call >> >> perBatchFn >> >>>> >> >>>> As a composite transform this will likely require a group by key which >> >> may >> >>>>> affect performance. Maybe within a dofn is better. >> >>>>> >> >>>> Yes, the processing is done with a DoFn indeed. >> >>>> >> >>>>> Then it could be some annotation or API that informs the runner. >> Should >> >>>>> batch sizes be fixed in the annotation (element count or size) or >> >> should >> >>>>> the user have some method that lets them decide when to process a >> batch >> >>>>> based on the contents? >> >>>>> >> >>>> For now, the user passes batchSize as an argument to BatchParDo.via() >> it >> >>>> is a number of elements. But batch based on content might be useful >> for >> >> the >> >>>> user. Give hint to the runner might be more flexible for the runner. >> >> Thanks. >> >>>> >> >>>>> Another thing to think about is whether this should be connected to >> the >> >>>>> ability to run parts of the bundle in parallel. >> >>>>> >> >>>> Yes! >> >>>> >> >>>>> Maybe each batch is an RPC >> >>>>> and you just want to start an async RPC for each batch. Then in >> >> addition >> >>>>> to >> >>>>> start the final RPC in finishBundle, you also need to wait for all >> the >> >>>>> RPCs >> >>>>> to complete. >> >>>>> >> >>>> Actually, currently each batch processing is whatever the user wants >> >>>> (perBatchFn user defined function). If the user decides to issue an >> >> async >> >>>> RPC in that function (call with the arrayList of input elements), IMHO >> >> he >> >>>> is responsible for waiting for the response in that method if he needs >> >> the >> >>>> response, but he can also do a send and forget, depending on his use >> >> case. >> >>>> >> >>>> Besides, I have also included a perElementFn user function to allow >> the >> >>>> user to do some processing on the elements before adding them to the >> >> batch >> >>>> (example use case: convert a String to a DTO object to invoke an >> >> external >> >>>> service) >> >>>> >> >>>> Etienne >> >>>> >> >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com> >> >>>>> wrote: >> >>>>> >> >>>>> Hi JB, >> >>>>> >> >>>>> I meant jira vote but discussion on the ML works also :) >> >>>>> >> >>>>> As I understand the need (see stackoverflow links in jira ticket) the >> >>>>> aim is to avoid the user having to code the batching logic in his own >> >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the >> >> bundles. >> >>>>> For example, possible use case is to batch a call to an external >> >> service >> >>>>> (for performance). >> >>>>> >> >>>>> I was thinking about providing a PTransform that implements the >> >> batching >> >>>>> in its own DoFn and that takes user defined functions for >> >> customization. >> >>>>> >> >>>>> Etienne >> >>>>> >> >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit : >> >>>>> >> >>>>>> Hi >> >>>>>> >> >>>>>> I guess you mean discussion on the mailing list about that, right ? >> >>>>>> >> >>>>>> AFAIR the idea is to provide a utility class to deal with >> >>>>>> >> >>>>> pooling/batching. However not sure it's required as with @StartBundle >> >> etc >> >>>>> in DoFn and batching depends of the end user "logic". >> >>>>> >> >>>>>> Regards >> >>>>>> JB >> >>>>>> >> >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot< >> >> echauc...@gmail.com> >> >>>>>> >> >>>>> wrote: >> >>>>> >> >>>>>> Hi all, >> >>>>>>> I have started to work on this ticket >> >>>>>>> https://issues.apache.org/jira/browse/BEAM-135 >> >>>>>>> >> >>>>>>> As there where no vote since March 18th, is the issue still >> >>>>>>> relevant/needed? >> >>>>>>> >> >>>>>>> Regards, >> >>>>>>> >> >>>>>>> Etienne >> >>>>>>> >> >> >> >> >> > >> >> -- >> Jean-Baptiste Onofré >> jbono...@apache.org >> http://blog.nanthrax.net >> Talend - http://www.talend.com >>