On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov <kirpic...@google.com.invalid> wrote: > I agree that wrapping the DoFn is probably not the way to go, because the > DoFn may be quite tricky due to all the reflective features: e.g. how do > you automatically "batch" a DoFn that uses state and timers? What about a > DoFn that uses a BoundedWindow parameter? What about a splittable DoFn? > What about future reflective features? The class for invoking DoFn's, > DoFnInvokers, is absent from the SDK (and present in runners-core) for a > good reason. > > I'd rather leave the intricacies of invoking DoFn's to runners, and say > that you can't wrap DoFn's, period - "adapter", "decorator" and other > design patterns just don't apply to DoFn's.
As a simple example, given a DoFn<T, O> it's perfectly natural to want to "wrap" this as a DoFn<KV<K, T>, KV<K, O>>. State, side inputs, windows, etc. would just be passed through. The fact that this is complicated, with reflection and flexible signatures and byte generation, is a property of the SDK (to provide a flexible DoFn API). I agree that it's nice to hide this complexity from the user, and it discourages this kind of composability. I would say that it's nice to let the "batching fn" have side inputs, setup/teardown, etc. Pretty much everything the current DoFn has, though of course using certain properties (e.g. state and timers, or windows) would restrict bundles to be contained within a single key/window/whatever. > The two options for batching are: > - A transform that takes elements and produces batches, like Robert said > - A simple Beam-agnostic library that takes Java objects and produces > batches of Java objects, with an API that makes it convenient to use in a > typical batching DoFn I don't think a Beam-agnostic library could correctly handle details like windowing and timestamps. On Thu, Jan 26, 2017 at 3:53 PM, Ben Chambers <bchamb...@google.com.invalid> wrote: > The third option for batching: > > - Functionality within the DoFn and DoFnRunner built as part of the SDK. > > I haven't thought through Batching, but at least for the > IntraBundleParallelization use case this actually does make sense to expose > as a part of the model. Knowing that a DoFn supports parallelization, a > runner may want to control how much parallelization is allowed, and the > DoFn also needs to make sure to wait on all those threads (and make sure > they're properly setup for logging/metrics/etc. associated with the current > step). > > There may be good reasons to make this a property of a DoFn that the runner > can inspect, and support. For instance, if a DoFn wants to process batches > of 50, it may be possible to factor that into how input is split/bundled. That's an interesting idea. I think this could also be done via the Fn API by recognizing the URN of "batching DoFn."