First off, let me say that a *correctly* batching DoFn is a lot of
value, especially because it's (too) easy to (often unknowingly)
implement it incorrectly.

My take is that a BatchingParDo should be a PTransform<PCollection<T>,
PCollection<O>> that takes a DoFn<? super Iterable<T>, ? extends
Iterable<O>> as a parameter, as well as some (optional?) batching
criteria (probably batch size and/or batch timeout). The DoFn should
map the set of inputs to a set of outputs of the same size and in the
same order as the input (or, possibly, an empty list would be
acceptable). Semantically, it should be defined as

public expand(PCollection<T> input) {
  return input
    .apply(e -> SingletonList.of(e))
    .apply(parDo(batchDoFn))
    .apply(es -> Iterables.onlyElement(es));
}

Getting this correct wrt timestamps and windowing is tricky. However,
even something that handles the most trivial case (e.g. GlobalWindows
only) and degenerates to batch sizes of 1 for other cases would allow
people to start using this code (rather than rolling their own) and we
could then continue to refine it.

More responses inline below.

On Thu, Jan 26, 2017 at 1:48 AM, Etienne Chauchot <echauc...@gmail.com> wrote:
> Hi,
>
> I have started to implement this ticket. For now it is implemented as a
> PTransform that simply does ParDo.of(new DoFn) and all the processing
> related to batching is done in the DoFn.
>
> I'm starting to deal with windows and bundles (starting to take a look at
> the State API to process trans-bundles, more questions about this to come).
> My comments/questions are inline:
>
> Le 17/01/2017 à 18:41, Ben Chambers a écrit :
>>
>> We should start by understanding the goals. If elements are in different
>> windows can they be out in the same batch? If they have different
>> timestamps what timestamp should the batch have?
>
> Regarding timestamps: currently design is as so: the transform does not
> group elements in the PCollection, so the "batch" does not exist as an
> element in the PCollection. There is only a user defined function
> (perBatchFn) that gets called when batchSize elements have been processed.
> This function takes an ArrayList as parameter. So elements keep their
> original timestamps

Correct, elements must keep their original timestamps. This is one
reason @OnWindowExpiration is insufficient. The watermark needs to he
held back to the timestamp of the earliest element in the buffer.

> Regarding windowing: I guess that if elements are not in the same window,
> they are not expected to be in the same batch.

Batching should be possible across windows, as long as the
innerBatchDoFn does not take the Window (or window-dependent side
inputs) as parameters. Note in particular, if there is ever
non-trivial windowing, after a GBK each successive element is almost
certainly in a different window from its predecessor, which would make
emitting after each window change useless.

> I'm just starting to work on these subjects, so I might lack a bit of
> information;
> what I am currently thinking about is that I need a way to know in the DoFn
> that the window has expired so that I can call the perBatchFn even if
> batchSize is not reached.  This is the @OnWindowExpiration callback that
> Kenneth mentioned in an email about bundles.
> Lets imagine that we have a collection of elements artificially timestamped
> every 10 seconds (for simplicity of the example) and a fixed windowing of 1
> minute. Then each window contains 6 elements. If we were to buffer the
> elements by batches of 5 elements, then for each window we expect to get 2
> batches (one of 5 elements, one of 1 element). For that to append, we need a
> @OnWindowExpiration on the DoFn where we call perBatchFn
>
>> As a composite transform this will likely require a group by key which may
>> affect performance. Maybe within a dofn is better.
>
> Yes, the processing is done with a DoFn indeed.

However, without a GBK it is unclear which key state would be stored
with respect to. (On that note, one should be able to batch across
keys, which makes using the state API as is difficult.)

>> Then it could be some annotation or API that informs the runner. Should
>> batch sizes be fixed in the annotation (element count or size) or should
>> the user have some method that lets them decide when to process a batch
>> based on the contents?
>
> For now, the user passes batchSize as an argument to BatchParDo.via() it is
> a number of elements. But batch based on content might be useful for the
> user. Give hint to the runner might be more flexible for the runner. Thanks.

We should allow for runners to tune this parameter. We should also
allow for time-based batch expiration.

>> Another thing to think about is whether this should be connected to the
>> ability to run parts of the bundle in parallel.
>
> Yes!

This is, in some sense, a "sliding batch" but many of the concerns
(e.g. holding the watermark, outputting with the correct timestamps
and windows) are similar. The semantics of MutliThreadedParDo.of(doFn)
should be identical to ParDo.of(doFn).

As with batching, there's a question of whether this should be
implemented as a PTransform<Pc<T>, Pc<O>> or a DoFn<T, O>.

>> Maybe each batch is an RPC
>> and you just want to start an async RPC for each batch. Then in addition
>> to
>> start the final RPC in finishBundle, you also need to wait for all the
>> RPCs
>> to complete.
>
> Actually, currently each batch processing is whatever the user wants
> (perBatchFn user defined function). If the user decides to issue an async
> RPC in that function (call with the arrayList of input elements), IMHO he is
> responsible for waiting for the response in that method if he needs the
> response, but he can also do a send and forget, depending on his use case.
>
> Besides, I have also included a perElementFn user function to allow the user
> to do some processing on the elements before adding them to the batch
> (example use case: convert a String to a DTO object to invoke an external
> service)

I think a perElementFn belongs as a ParDo that proceeds this
PTransform, rather than as part of it.

Reply via email to