Hi,
Let say I already have a DoFn<InputT, OutputT> subclass MyFn from some
InputT and OutputT in my dataflow code library and I am writing a pipeline
that deals with KV<K, InputT> elements and I would like to MyFn to those
elements. I'd like to minimise boilerplate so I'm tempted to write a
composite transform or combinator that applies a DoFn to the values in a KV.
The trouble is the output of DoFn is only accessible if the context is
somehow extended to make the output accessible and in any case the context
is normally created by the DoFnRunner. The DoFn.ProcessContext is somewhat
broad since it requires various concerns to implemented such as sideInputs,
sideOutputs and windowing, which may not always be being used. There's also
no 'Adapter' abstract ProcessContext class of default method
implementations. This sets off alarm bells that perhaps I am going down the
wrong route.
So my question is how sensible is it write some kind of combinator for DoFn
instances?
To give some idea of what the combinator might look like here is a sketch
implementation:
public class ApplyToValues<K, InputT, OutputT> extends
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
private DoFn<KV<K,InputT>, KV<K,OutputT>> fun;
private class WrapperDoFn extends DoFn<KV<K,InputT>, KV<K,OutputT>> {
class WrapperContext extends DoFn<KV<K,InputT>,
KV<K,OutputT>>.ProcessContext {
private KV<K, OutputT> output;
public WrapperContext(DoFn<KV<K,InputT>,
KV<K,OutputT>>.ProcessContext inner) {
}
// override abstract methods
public KV<K,OutputT> getOutput() {
return output;
}
}
@Override
public void processElement(ProcessContext c) throws Exception {
WrapperContext wrapperContext = new WrapperContext(c);
fun.processElement(wrapperContext);
c.output(wrapperContext.getOutput());
}
}
public ApplyToValues(DoFn<KV<K,InputT>, KV<K,OutputT>> fun) {
this.fun = fun;
}
@Override
public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>>
input) {
input.apply(ParDo.of(new WrapperDoFn()));
}
}
usage:
PCollection<KV<String, String>> pairsWithValuesInCaps =
stringPairs.apply(new ApplyToValues(new ToUpperCaseDoFn());
There's already a lot there, excluding all the abstract methods that you
would have to implement for ProcessContext. So I guess I should probably
stick with the boilerplate! Even so I curious to know what people think on
this list. Perhaps it is better to structure reusable element processors as
SerializableFunctions? I'm not sure.
Thanks,
Frank