Currently we have three ways to create a "DoFn":
SerializableFunction (minimal boilerplate but no access to things like side
inputs, requires Java 8)
DoFnWithContext (runtime based reflection is used to call your method and
fill in only the parameters your interested in)
DoFn (access to everything all the time, most boilerplate)

SerializableFunction is your best bet for having the minimal amount of
boilerplate and not coming up with another way to wrap all the things
followed by DoFnWithContext.
DoFnWithContext is meant to be the way in which by using annotations you
only specify what you need and will be a better approach then creating your
own wrapper.

If you haven't taken a look at DoFnWithContext yet, I would and please
contribute to making it better if it still doesn't fit your use cases.

On Mon, Jun 13, 2016 at 7:15 AM, Frank Wilson <[email protected]>
wrote:

> Hi,
>
> Let say I already have a DoFn<InputT, OutputT> subclass MyFn from some
> InputT and OutputT in my dataflow code library and I am writing a pipeline
> that deals with KV<K, InputT> elements and I would like to MyFn to those
> elements. I'd like to minimise boilerplate so I'm tempted to write a
> composite transform or combinator that applies a DoFn to the values in a KV.
>
> The trouble is the output of DoFn is only accessible if the context is
> somehow extended to make the output accessible and in any case the context
> is normally created by the DoFnRunner. The DoFn.ProcessContext is somewhat
> broad since it requires various concerns to implemented such as sideInputs,
> sideOutputs and windowing, which may not always be being used. There's also
> no 'Adapter' abstract ProcessContext class of default method
> implementations. This sets off alarm bells that perhaps I am going down the
> wrong route.
>
> So my question is how sensible is it write some kind of combinator for
> DoFn instances?
>
> To give some idea of what the combinator might look like here is a sketch
> implementation:
>
> public class ApplyToValues<K, InputT, OutputT> extends
> PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
>
>     private DoFn<KV<K,InputT>, KV<K,OutputT>> fun;
>
>     private class WrapperDoFn extends DoFn<KV<K,InputT>, KV<K,OutputT>> {
>
>         class WrapperContext extends DoFn<KV<K,InputT>,
> KV<K,OutputT>>.ProcessContext {
>
>             private KV<K, OutputT> output;
>
>             public WrapperContext(DoFn<KV<K,InputT>,
> KV<K,OutputT>>.ProcessContext inner) {
>
>             }
>
>             // override abstract methods
>
>             public KV<K,OutputT> getOutput() {
>                 return output;
>             }
>         }
>
>         @Override
>         public void processElement(ProcessContext c) throws Exception {
>             WrapperContext wrapperContext = new WrapperContext(c);
>             fun.processElement(wrapperContext);
>             c.output(wrapperContext.getOutput());
>         }
>     }
>
>     public ApplyToValues(DoFn<KV<K,InputT>, KV<K,OutputT>> fun) {
>         this.fun = fun;
>     }
>
>     @Override
>     public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>>
> input) {
>         input.apply(ParDo.of(new WrapperDoFn()));
>     }
> }
>
> usage:
>
> PCollection<KV<String, String>> pairsWithValuesInCaps =
>     stringPairs.apply(new ApplyToValues(new ToUpperCaseDoFn());
>
>
> There's already a lot there, excluding all the abstract methods that you
> would have to implement for ProcessContext. So I guess I should probably
> stick with the boilerplate! Even so I curious to know what people think on
> this list. Perhaps it is better to structure reusable element processors as
> SerializableFunctions? I'm not sure.
>
> Thanks,
>
>
> Frank
>

Reply via email to