Pei He created BEAM-96: -------------------------- Summary: Support composing combine functions Key: BEAM-96 URL: https://issues.apache.org/jira/browse/BEAM-96 Project: Beam Issue Type: New Feature Components: sdk-java-core Reporter: Pei He Assignee: Davor Bonaci
The proposal of composed combine functions is following: pc.apply( Combine.perKey( CombineFns.composeKeyed() .with(identityFn, new MaxIntegerFn(), maxLatencyTag) .with(identityFn, new MeanFn<Integer>(), meanLatencyTag))); Example code: * PCollection<KV<K, Integer>> latencies = ...; * * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>(); * TupleTag<Double> meanLatencyTag = new TupleTag<Double>(); * * SimpleFunction<Integer, Integer> identityFn = * new SimpleFunction<Integer, Integer>() { * @Override * public Integer apply(Integer input) { * return input; * }}; * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply( * Combine.perKey( * CombineFns.composeKeyed() * .with(identityFn, new MaxIntegerFn(), maxLatencyTag) * .with(identityFn, new MeanFn<Integer>(), meanLatencyTag))); * * PCollection<T> finalResultCollection = maxAndMean * .apply(ParDo.of( * new DoFn<KV<K, CoCombineResult>, T>() { * @Override * public void processElement(ProcessContext c) throws Exception { * KV<K, CoCombineResult> e = c.element(); * Integer maxLatency = e.getValue().get(maxLatencyTag); * Double meanLatency = e.getValue().get(meanLatencyTag); * .... Do Something .... * c.output(...some T...); * } * })); -- This message was sent by Atlassian JIRA (v6.3.4#6332)