On Thu, Jan 26, 2017 at 3:31 PM, Ben Chambers <bchamb...@google.com.invalid> wrote: > I think that wrapping the DoFn is tricky -- we backed out > IntraBundleParallelization because it did that, and it has weird > interactions with both the reflective DoFn and windowing. We could maybe > make some kind of "DoFnDelegatingDoFn" that could act as a base class and > get some of that right, but...
Yeah, this is a lot trickier with NewDoFn. Which is unfortunate as this isn't the only case where we want to make DoFns more compossible. > One question I have is whether this batching should be "make batches of N > and if you need to wait for the Nth element do so" or "make batches of at > most N but don't wait too long if you don't get to N". In the former case, > we'll need to do something to buffer elements between bundles -- whether > this is using State or a GroupByKey, etc. In the latter case, the buffering > can happen entirely within a bundle -- if you get to the end of the bundle > and only have 5 elements, even if 5 < N, process that as a batch (rather > than shifting it somewhere else). I think the "make batches of at most N but don't wait too long if you don't get to N" is a very useful first (and tractable) start that can be built on. > On Thu, Jan 26, 2017 at 3:01 PM Robert Bradshaw <rober...@google.com.invalid> > wrote: > >> On Thu, Jan 26, 2017 at 12:48 PM, Eugene Kirpichov >> <kirpic...@google.com.invalid> wrote: >> > I don't think we should make batching a core feature of the Beam >> > programming model (by adding it to DoFn as this code snippet implies). >> I'm >> > reasonably sure there are less invasive ways of implementing it. >> >> +1, either as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O> that >> wraps/delegates to a DoFn<Iterable<T>, Iterable<O>>. >> >> > On Thu, Jan 26, 2017 at 12:22 PM Jean-Baptiste Onofré <j...@nanthrax.net> >> > wrote: >> > >> >> Agree, I'm curious as well. >> >> >> >> I guess it would be something like: >> >> >> >> .apply(ParDo(new DoFn() { >> >> >> >> @Override >> >> public long batchSize() { >> >> return 1000; >> >> } >> >> >> >> @ProcessElement >> >> public void processElement(ProcessContext context) { >> >> ... >> >> } >> >> })); >> >> >> >> If batchSize (overrided by user) returns a positive long, then DoFn can >> >> batch with this size. >> >> >> >> Regards >> >> JB >> >> >> >> On 01/26/2017 05:38 PM, Eugene Kirpichov wrote: >> >> > Hi Etienne, >> >> > >> >> > Could you post some snippets of how your transform is to be used in a >> >> > pipeline? I think that would make it easier to discuss on this thread >> and >> >> > could save a lot of churn if the discussion ends up leading to a >> >> different >> >> > API. >> >> > >> >> > On Thu, Jan 26, 2017 at 8:29 AM Etienne Chauchot <echauc...@gmail.com >> > >> >> > wrote: >> >> > >> >> >> Wonderful ! >> >> >> >> >> >> Thanks Kenn ! >> >> >> >> >> >> Etienne >> >> >> >> >> >> >> >> >> Le 26/01/2017 à 15:34, Kenneth Knowles a écrit : >> >> >>> Hi Etienne, >> >> >>> >> >> >>> I was drafting a proposal about @OnWindowExpiration when this email >> >> >>> arrived. I thought I would try to quickly unblock you by responding >> >> with >> >> >> a >> >> >>> TL;DR: you can achieve your goals with state & timers as they >> currently >> >> >>> exist. You'll set a timer for >> >> window.maxTimestamp().plus(allowedLateness) >> >> >>> precisely - when this timer fires, you are guaranteed that the input >> >> >>> watermark has exceeded this point (so all new data is droppable) >> while >> >> >> the >> >> >>> output timestamp is held to this point (so you can safely output >> into >> >> the >> >> >>> window). >> >> >>> >> >> >>> @OnWindowExpiration is (1) a convenience to save you from needing a >> >> >> handle >> >> >>> on the allowed lateness (not a problem in your case) and (2) >> actually >> >> >>> meaningful and potentially less expensive to implement in the >> absence >> >> of >> >> >>> state (this is why it needs a design discussion at all, really). >> >> >>> >> >> >>> Caveat: these APIs are new and not supported in every runner and >> >> >> windowing >> >> >>> configuration. >> >> >>> >> >> >>> Kenn >> >> >>> >> >> >>> On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot < >> echauc...@gmail.com >> >> > >> >> >>> wrote: >> >> >>> >> >> >>>> Hi, >> >> >>>> >> >> >>>> I have started to implement this ticket. For now it is implemented >> as >> >> a >> >> >>>> PTransform that simply does ParDo.of(new DoFn) and all the >> processing >> >> >>>> related to batching is done in the DoFn. >> >> >>>> >> >> >>>> I'm starting to deal with windows and bundles (starting to take a >> look >> >> >> at >> >> >>>> the State API to process trans-bundles, more questions about this >> to >> >> >> come). >> >> >>>> My comments/questions are inline: >> >> >>>> >> >> >>>> >> >> >>>> Le 17/01/2017 à 18:41, Ben Chambers a écrit : >> >> >>>> >> >> >>>>> We should start by understanding the goals. If elements are in >> >> >> different >> >> >>>>> windows can they be out in the same batch? If they have different >> >> >>>>> timestamps what timestamp should the batch have? >> >> >>>>> >> >> >>>> Regarding timestamps: currently design is as so: the transform does >> >> not >> >> >>>> group elements in the PCollection, so the "batch" does not exist >> as an >> >> >>>> element in the PCollection. There is only a user defined function >> >> >>>> (perBatchFn) that gets called when batchSize elements have been >> >> >> processed. >> >> >>>> This function takes an ArrayList as parameter. So elements keep >> their >> >> >>>> original timestamps >> >> >>>> >> >> >>>> >> >> >>>> Regarding windowing: I guess that if elements are not in the same >> >> >> window, >> >> >>>> they are not expected to be in the same batch. >> >> >>>> I'm just starting to work on these subjects, so I might lack a bit >> of >> >> >>>> information; >> >> >>>> what I am currently thinking about is that I need a way to know in >> the >> >> >>>> DoFn that the window has expired so that I can call the perBatchFn >> >> even >> >> >> if >> >> >>>> batchSize is not reached. This is the @OnWindowExpiration callback >> >> that >> >> >>>> Kenneth mentioned in an email about bundles. >> >> >>>> Lets imagine that we have a collection of elements artificially >> >> >>>> timestamped every 10 seconds (for simplicity of the example) and a >> >> fixed >> >> >>>> windowing of 1 minute. Then each window contains 6 elements. If we >> >> were >> >> >> to >> >> >>>> buffer the elements by batches of 5 elements, then for each window >> we >> >> >>>> expect to get 2 batches (one of 5 elements, one of 1 element). For >> >> that >> >> >> to >> >> >>>> append, we need a @OnWindowExpiration on the DoFn where we call >> >> >> perBatchFn >> >> >>>> >> >> >>>> As a composite transform this will likely require a group by key >> which >> >> >> may >> >> >>>>> affect performance. Maybe within a dofn is better. >> >> >>>>> >> >> >>>> Yes, the processing is done with a DoFn indeed. >> >> >>>> >> >> >>>>> Then it could be some annotation or API that informs the runner. >> >> Should >> >> >>>>> batch sizes be fixed in the annotation (element count or size) or >> >> >> should >> >> >>>>> the user have some method that lets them decide when to process a >> >> batch >> >> >>>>> based on the contents? >> >> >>>>> >> >> >>>> For now, the user passes batchSize as an argument to >> BatchParDo.via() >> >> it >> >> >>>> is a number of elements. But batch based on content might be useful >> >> for >> >> >> the >> >> >>>> user. Give hint to the runner might be more flexible for the >> runner. >> >> >> Thanks. >> >> >>>> >> >> >>>>> Another thing to think about is whether this should be connected >> to >> >> the >> >> >>>>> ability to run parts of the bundle in parallel. >> >> >>>>> >> >> >>>> Yes! >> >> >>>> >> >> >>>>> Maybe each batch is an RPC >> >> >>>>> and you just want to start an async RPC for each batch. Then in >> >> >> addition >> >> >>>>> to >> >> >>>>> start the final RPC in finishBundle, you also need to wait for all >> >> the >> >> >>>>> RPCs >> >> >>>>> to complete. >> >> >>>>> >> >> >>>> Actually, currently each batch processing is whatever the user >> wants >> >> >>>> (perBatchFn user defined function). If the user decides to issue an >> >> >> async >> >> >>>> RPC in that function (call with the arrayList of input elements), >> IMHO >> >> >> he >> >> >>>> is responsible for waiting for the response in that method if he >> needs >> >> >> the >> >> >>>> response, but he can also do a send and forget, depending on his >> use >> >> >> case. >> >> >>>> >> >> >>>> Besides, I have also included a perElementFn user function to allow >> >> the >> >> >>>> user to do some processing on the elements before adding them to >> the >> >> >> batch >> >> >>>> (example use case: convert a String to a DTO object to invoke an >> >> >> external >> >> >>>> service) >> >> >>>> >> >> >>>> Etienne >> >> >>>> >> >> >>>> On Tue, Jan 17, 2017, 8:48 AM Etienne Chauchot<echauc...@gmail.com >> > >> >> >>>>> wrote: >> >> >>>>> >> >> >>>>> Hi JB, >> >> >>>>> >> >> >>>>> I meant jira vote but discussion on the ML works also :) >> >> >>>>> >> >> >>>>> As I understand the need (see stackoverflow links in jira ticket) >> the >> >> >>>>> aim is to avoid the user having to code the batching logic in his >> own >> >> >>>>> DoFn.processElement() and DoFn.finishBundle() regardless of the >> >> >> bundles. >> >> >>>>> For example, possible use case is to batch a call to an external >> >> >> service >> >> >>>>> (for performance). >> >> >>>>> >> >> >>>>> I was thinking about providing a PTransform that implements the >> >> >> batching >> >> >>>>> in its own DoFn and that takes user defined functions for >> >> >> customization. >> >> >>>>> >> >> >>>>> Etienne >> >> >>>>> >> >> >>>>> Le 17/01/2017 à 17:30, Jean-Baptiste Onofré a écrit : >> >> >>>>> >> >> >>>>>> Hi >> >> >>>>>> >> >> >>>>>> I guess you mean discussion on the mailing list about that, >> right ? >> >> >>>>>> >> >> >>>>>> AFAIR the idea is to provide a utility class to deal with >> >> >>>>>> >> >> >>>>> pooling/batching. However not sure it's required as with >> @StartBundle >> >> >> etc >> >> >>>>> in DoFn and batching depends of the end user "logic". >> >> >>>>> >> >> >>>>>> Regards >> >> >>>>>> JB >> >> >>>>>> >> >> >>>>>> On Jan 17, 2017, 08:26, at 08:26, Etienne Chauchot< >> >> >> echauc...@gmail.com> >> >> >>>>>> >> >> >>>>> wrote: >> >> >>>>> >> >> >>>>>> Hi all, >> >> >>>>>>> I have started to work on this ticket >> >> >>>>>>> https://issues.apache.org/jira/browse/BEAM-135 >> >> >>>>>>> >> >> >>>>>>> As there where no vote since March 18th, is the issue still >> >> >>>>>>> relevant/needed? >> >> >>>>>>> >> >> >>>>>>> Regards, >> >> >>>>>>> >> >> >>>>>>> Etienne >> >> >>>>>>> >> >> >> >> >> >> >> >> > >> >> >> >> -- >> >> Jean-Baptiste Onofré >> >> jbono...@apache.org >> >> http://blog.nanthrax.net >> >> Talend - http://www.talend.com >> >> >>