Hi Frank,
Why don't you simply pass on the context to your value-transforming wrapped
DoFn? Your wrapped function will call `output(..)` on the passed context:
public class ApplyToValues<K, InputT, OutputT> extends
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
private DoFn<KV<K,InputT>, KV<K,OutputT>> fun;
public ApplyToValues(DoFn<KV<K,InputT>, KV<K,OutputT>> fun) {
this.fun = fun;
}
private class WrapperDoFn extends DoFn<KV<K,InputT>, KV<K,OutputT>> {
@Override
public void processElement(ProcessContext c) throws Exception {
fun.processElement(c);
}
}
@Override
public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>>
input) {
return input.apply(ParDo.of(new WrapperDoFn()));
}
}
For completeness, I would also override the startBundle and finishBundle
methods and pass the context to the wrapped function there as well.
On Mon, Jun 13, 2016 at 6:34 PM, Lukasz Cwik <[email protected]> wrote:
> SerializableFunction (minimal boilerplate but no access to things like
side
> inputs, requires Java 8)
Java 8 is only required if you want to use the Lambda notation. Anonymous
classes also work but are less pretty and require to fill in the input and
output types in advance.
> DoFnWithContext (runtime based reflection is used to call your method and
> fill in only the parameters your interested in)
I find the reflection based approach a bit fragile (you only get type
errors during runtime) but I can see how this may be convenient for users.
> DoFn (access to everything all the time, most boilerplate)
Though the boilerplate is hidden in the base class. You only have to
implement the "processElement" method.
On Mon, Jun 13, 2016 at 6:34 PM, Lukasz Cwik <[email protected]> wrote:
> Currently we have three ways to create a "DoFn":
> SerializableFunction (minimal boilerplate but no access to things like
side
> inputs, requires Java 8)
> DoFnWithContext (runtime based reflection is used to call your method and
> fill in only the parameters your interested in)
> DoFn (access to everything all the time, most boilerplate)
>
> SerializableFunction is your best bet for having the minimal amount of
> boilerplate and not coming up with another way to wrap all the things
> followed by DoFnWithContext.
> DoFnWithContext is meant to be the way in which by using annotations you
> only specify what you need and will be a better approach then creating
your
> own wrapper.
>
> If you haven't taken a look at DoFnWithContext yet, I would and please
> contribute to making it better if it still doesn't fit your use cases.
>
> On Mon, Jun 13, 2016 at 7:15 AM, Frank Wilson <[email protected]>
> wrote:
>>
>> Hi,
>>
>> Let say I already have a DoFn<InputT, OutputT> subclass MyFn from some
>> InputT and OutputT in my dataflow code library and I am writing a
pipeline
>> that deals with KV<K, InputT> elements and I would like to MyFn to those
>> elements. I'd like to minimise boilerplate so I'm tempted to write a
>> composite transform or combinator that applies a DoFn to the values in a
KV.
>>
>> The trouble is the output of DoFn is only accessible if the context is
>> somehow extended to make the output accessible and in any case the
context
>> is normally created by the DoFnRunner. The DoFn.ProcessContext is
somewhat
>> broad since it requires various concerns to implemented such as
sideInputs,
>> sideOutputs and windowing, which may not always be being used. There's
also
>> no 'Adapter' abstract ProcessContext class of default method
>> implementations. This sets off alarm bells that perhaps I am going down
the
>> wrong route.
>>
>> So my question is how sensible is it write some kind of combinator for
>> DoFn instances?
>>
>> To give some idea of what the combinator might look like here is a sketch
>> implementation:
>>
>> public class ApplyToValues<K, InputT, OutputT> extends
>> PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
>>
>> private DoFn<KV<K,InputT>, KV<K,OutputT>> fun;
>>
>> private class WrapperDoFn extends DoFn<KV<K,InputT>, KV<K,OutputT>> {
>>
>> class WrapperContext extends DoFn<KV<K,InputT>,
>> KV<K,OutputT>>.ProcessContext {
>>
>> private KV<K, OutputT> output;
>>
>> public WrapperContext(DoFn<KV<K,InputT>,
>> KV<K,OutputT>>.ProcessContext inner) {
>>
>> }
>>
>> // override abstract methods
>>
>> public KV<K,OutputT> getOutput() {
>> return output;
>> }
>> }
>>
>> @Override
>> public void processElement(ProcessContext c) throws Exception {
>> WrapperContext wrapperContext = new WrapperContext(c);
>> fun.processElement(wrapperContext);
>> c.output(wrapperContext.getOutput());
>> }
>> }
>>
>> public ApplyToValues(DoFn<KV<K,InputT>, KV<K,OutputT>> fun) {
>> this.fun = fun;
>> }
>>
>> @Override
>> public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>>
>> input) {
>> input.apply(ParDo.of(new WrapperDoFn()));
>> }
>> }
>>
>> usage:
>>
>> PCollection<KV<String, String>> pairsWithValuesInCaps =
>> stringPairs.apply(new ApplyToValues(new ToUpperCaseDoFn());
>>
>>
>> There's already a lot there, excluding all the abstract methods that you
>> would have to implement for ProcessContext. So I guess I should probably
>> stick with the boilerplate! Even so I curious to know what people think
on
>> this list. Perhaps it is better to structure reusable element processors
as
>> SerializableFunctions? I'm not sure.
>>
>> Thanks,
>>
>>
>> Frank
>
>