First off, let me say that a *correctly* batching DoFn is a lot of value, especially because it's (too) easy to (often unknowingly) implement it incorrectly.
My take is that a BatchingParDo should be a PTransform<PCollection<T>, PCollection<O>> that takes a DoFn<? super Iterable<T>, ? extends Iterable<O>> as a parameter, as well as some (optional?) batching criteria (probably batch size and/or batch timeout). The DoFn should map the set of inputs to a set of outputs of the same size and in the same order as the input (or, possibly, an empty list would be acceptable). Semantically, it should be defined as public expand(PCollection<T> input) { return input .apply(e -> SingletonList.of(e)) .apply(parDo(batchDoFn)) .apply(es -> Iterables.onlyElement(es)); } Getting this correct wrt timestamps and windowing is tricky. However, even something that handles the most trivial case (e.g. GlobalWindows only) and degenerates to batch sizes of 1 for other cases would allow people to start using this code (rather than rolling their own) and we could then continue to refine it. More responses inline below. On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <echauc...@gmail.com> wrote: > Hi, > > I have started to implement this ticket. For now it is implemented as a > PTransform that simply does ParDo.of(new DoFn) and all the processing > related to batching is done in the DoFn. > > I'm starting to deal with windows and bundles (starting to take a look at > the State API to process trans-bundles, more questions about this to come). > My comments/questions are inline: > > Le 17/01/2017 à 18:41, Ben Chambers a écrit : >> >> We should start by understanding the goals. If elements are in different >> windows can they be out in the same batch? If they have different >> timestamps what timestamp should the batch have? > > Regarding timestamps: currently design is as so: the transform does not > group elements in the PCollection, so the "batch" does not exist as an > element in the PCollection. There is only a user defined function > (perBatchFn) that gets called when batchSize elements have been processed. > This function takes an ArrayList as parameter. So elements keep their > original timestamps Correct, elements must keep their original timestamps. This is one reason @OnWindowExpiration is insufficient. The watermark needs to he held back to the timestamp of the earliest element in the buffer. > Regarding windowing: I guess that if elements are not in the same window, > they are not expected to be in the same batch. Batching should be possible across windows, as long as the innerBatchDoFn does not take the Window (or window-dependent side inputs) as parameters. Note in particular, if there is ever non-trivial windowing, after a GBK each successive element is almost certainly in a different window from its predecessor, which would make emitting after each window change useless. > I'm just starting to work on these subjects, so I might lack a bit of > information; > what I am currently thinking about is that I need a way to know in the DoFn > that the window has expired so that I can call the perBatchFn even if > batchSize is not reached. This is the @OnWindowExpiration callback that > Kenneth mentioned in an email about bundles. > Lets imagine that we have a collection of elements artificially timestamped > every 10 seconds (for simplicity of the example) and a fixed windowing of 1 > minute. Then each window contains 6 elements. If we were to buffer the > elements by batches of 5 elements, then for each window we expect to get 2 > batches (one of 5 elements, one of 1 element). For that to append, we need a > @OnWindowExpiration on the DoFn where we call perBatchFn > >> As a composite transform this will likely require a group by key which may >> affect performance. Maybe within a dofn is better. > > Yes, the processing is done with a DoFn indeed. However, without a GBK it is unclear which key state would be stored with respect to. (On that note, one should be able to batch across keys, which makes using the state API as is difficult.) >> Then it could be some annotation or API that informs the runner. Should >> batch sizes be fixed in the annotation (element count or size) or should >> the user have some method that lets them decide when to process a batch >> based on the contents? > > For now, the user passes batchSize as an argument to BatchParDo.via() it is > a number of elements. But batch based on content might be useful for the > user. Give hint to the runner might be more flexible for the runner. Thanks. We should allow for runners to tune this parameter. We should also allow for time-based batch expiration. >> Another thing to think about is whether this should be connected to the >> ability to run parts of the bundle in parallel. > > Yes! This is, in some sense, a "sliding batch" but many of the concerns (e.g. holding the watermark, outputting with the correct timestamps and windows) are similar. The semantics of MutliThreadedParDo.of(doFn) should be identical to ParDo.of(doFn). As with batching, there's a question of whether this should be implemented as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O>. >> Maybe each batch is an RPC >> and you just want to start an async RPC for each batch. Then in addition >> to >> start the final RPC in finishBundle, you also need to wait for all the >> RPCs >> to complete. > > Actually, currently each batch processing is whatever the user wants > (perBatchFn user defined function). If the user decides to issue an async > RPC in that function (call with the arrayList of input elements), IMHO he is > responsible for waiting for the response in that method if he needs the > response, but he can also do a send and forget, depending on his use case. > > Besides, I have also included a perElementFn user function to allow the user > to do some processing on the elements before adding them to the batch > (example use case: convert a String to a DTO object to invoke an external > service) I think a perElementFn belongs as a ParDo that proceeds this PTransform, rather than as part of it.