On Thu, Jan 26, 2017 at 3:42 PM, Eugene Kirpichov
<kirpic...@google.com.invalid> wrote:
> I agree that wrapping the DoFn is probably not the way to go, because the
> DoFn may be quite tricky due to all the reflective features: e.g. how do
> you automatically "batch" a DoFn that uses state and timers? What about a
> DoFn that uses a BoundedWindow parameter? What about a splittable DoFn?
> What about future reflective features? The class for invoking DoFn's,
> DoFnInvokers, is absent from the SDK (and present in runners-core) for a
> good reason.
>
> I'd rather leave the intricacies of invoking DoFn's to runners, and say
> that you can't wrap DoFn's, period - "adapter", "decorator" and other
> design patterns just don't apply to DoFn's.

As a simple example, given a DoFn<T, O> it's perfectly natural to want
to "wrap" this as a DoFn<KV<K, T>, KV<K, O>>. State, side inputs,
windows, etc. would just be passed through.

The fact that this is complicated, with reflection and flexible
signatures and byte generation, is a property of the SDK (to provide a
flexible DoFn API). I agree that it's nice to hide this complexity
from the user, and it discourages this kind of composability.

I would say that it's nice to let the "batching fn" have side inputs,
setup/teardown, etc. Pretty much everything the current DoFn has,
though of course using certain properties (e.g. state and timers, or
windows) would restrict bundles to be contained within a single
key/window/whatever.

> The two options for batching are:
> - A transform that takes elements and produces batches, like Robert said
> - A simple Beam-agnostic library that takes Java objects and produces
> batches of Java objects, with an API that makes it convenient to use in a
> typical batching DoFn

I don't think a Beam-agnostic library could correctly handle details
like windowing and timestamps.


On Thu, Jan 26, 2017 at 3:53 PM, Ben Chambers
<bchamb...@google.com.invalid> wrote:
> The third option for batching:
>
> - Functionality within the DoFn and DoFnRunner built as part of the SDK.
>
> I haven't thought through Batching, but at least for the
> IntraBundleParallelization use case this actually does make sense to expose
> as a part of the model. Knowing that a DoFn supports parallelization, a
> runner may want to control how much parallelization is allowed, and the
> DoFn also needs to make sure to wait on all those threads (and make sure
> they're properly setup for logging/metrics/etc. associated with the current
> step).
>
> There may be good reasons to make this a property of a DoFn that the runner
> can inspect, and support. For instance, if a DoFn wants to process batches
> of 50, it may be possible to factor that into how input is split/bundled.

That's an interesting idea. I think this could also be done via the Fn
API by recognizing the URN of "batching DoFn."

Reply via email to