Hi Maximilian,
Sorry there was a mistake in my sketch implementation. The wrapped DoFn was
supposed to transform the value in each KV only, not the whole KV!
This means I need the result of the output of the wrapper DoFn to build a
new KV instance with the result of the inner DoFn.
The code should have looked like this:
public class ApplyToValues<K, InputT, OutputT> extends
PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> {
private DoFn<InputT ,OutputT> fun;
private class WrapperDoFn extends DoFn<KV<K,InputT>, KV<K,OutputT>> {
class WrapperContext extends DoFn<InputT ,OutputT>.ProcessContext {
private OutputT output;
public WrapperContext(DoFn<KV<K,InputT>,
KV<K,OutputT>>.ProcessContext inner) {
}
public OutputT getOutput() {
return output;
}
// methods to implement
}
@Override
public void processElement(ProcessContext c) throws Exception {
WrapperContext wrapperContext = new WrapperContext(c);
fun.processElement(wrapperContext);
K key = c.element().getKey();
c.output(KV.of(key, wrapperContext.getOutput()));
}
}
public ApplyToValues(DoFn<InputT ,OutputT> fun) {
this.fun = fun;
}
@Override
public PCollection<KV<K, OutputT>> apply(PCollection<KV<K, InputT>>
input) {
return input.apply(ParDo.of(new WrapperDoFn()));
}
}
But your are absolutely right if I didn't need the output I could have just
called processElement on the wrapped DoFn with the given context.
Thanks,
Frank
On 14 June 2016 at 10:09, Maximilian Michels <[email protected]> wrote:
> Hi Frank,
>
> Why don't you simply pass on the context to your value-transforming
> wrapped DoFn? Your wrapped function will call `output(..)` on the passed
> context:
>
>
>