Here's an example API that would make this part of a DoFn. The idea here is that it would still be run as `ParDo.of(new MyBatchedDoFn())`, but the runner (and DoFnRunner) could see that it has asked for batches, so rather than calling a `processElement` on every input `I`, it assembles a `Collection<I>` and then calls the method.
Possible API making this part of DoFn (with a fixed size): public MyBatchedDoFn extends DoFn<I, O> { @ProcessBatch(size = 50) public void processBatch(ProcessContext c) { Collection<I> batchContents = c.element(); ... } } Possible API making this part of DoFn (with dynamic size): public MyBatchedDoFn extends DoFn<I, O> { @ProcessBatch public boolean processBatch(ProcessContext c) { Collection<I> batchContents = c.element(); if (batchContents.size() < 50) { return false; // batch not yet processed } ... return true; } } On Thu, Jan 26, 2017 at 4:16 PM Robert Bradshaw <rober...@google.com.invalid> wrote: > On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov > <kirpic...@google.com.invalid> wrote: > > I agree that wrapping the DoFn is probably not the way to go, because the > > DoFn may be quite tricky due to all the reflective features: e.g. how do > > you automatically "batch" a DoFn that uses state and timers? What about a > > DoFn that uses a BoundedWindow parameter? What about a splittable DoFn? > > What about future reflective features? The class for invoking DoFn's, > > DoFnInvokers, is absent from the SDK (and present in runners-core) for a > > good reason. > > > > I'd rather leave the intricacies of invoking DoFn's to runners, and say > > that you can't wrap DoFn's, period - "adapter", "decorator" and other > > design patterns just don't apply to DoFn's. > > As a simple example, given a DoFn<T, O> it's perfectly natural to want > to "wrap" this as a DoFn<KV<K, T>, KV<K, O>>. State, side inputs, > windows, etc. would just be passed through. > > The fact that this is complicated, with reflection and flexible > signatures and byte generation, is a property of the SDK (to provide a > flexible DoFn API). I agree that it's nice to hide this complexity > from the user, and it discourages this kind of composability. > > I would say that it's nice to let the "batching fn" have side inputs, > setup/teardown, etc. Pretty much everything the current DoFn has, > though of course using certain properties (e.g. state and timers, or > windows) would restrict bundles to be contained within a single > key/window/whatever. > > > The two options for batching are: > > - A transform that takes elements and produces batches, like Robert said > > - A simple Beam-agnostic library that takes Java objects and produces > > batches of Java objects, with an API that makes it convenient to use in a > > typical batching DoFn > > I don't think a Beam-agnostic library could correctly handle details > like windowing and timestamps. > > > On Thu, Jan 26, 2017 at 3:53 PM, Ben Chambers > <bchamb...@google.com.invalid> wrote: > > The third option for batching: > > > > - Functionality within the DoFn and DoFnRunner built as part of the SDK. > > > > I haven't thought through Batching, but at least for the > > IntraBundleParallelization use case this actually does make sense to > expose > > as a part of the model. Knowing that a DoFn supports parallelization, a > > runner may want to control how much parallelization is allowed, and the > > DoFn also needs to make sure to wait on all those threads (and make sure > > they're properly setup for logging/metrics/etc. associated with the > current > > step). > > > > There may be good reasons to make this a property of a DoFn that the > runner > > can inspect, and support. For instance, if a DoFn wants to process > batches > > of 50, it may be possible to factor that into how input is split/bundled. > > That's an interesting idea. I think this could also be done via the Fn > API by recognizing the URN of "batching DoFn." >