http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 5ffaef8..0be8517 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -43,12 +43,9 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn; -import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -155,7 +152,7 @@ public class Combine { */ public static <K, V> PerKey<K, V, V> perKey( SerializableFunction<Iterable<V>, V> fn) { - return perKey(IterableCombineFn.of(fn).<K>asKeyedFn(), displayDataForFn(fn)); + return perKey(IterableCombineFn.of(fn), displayDataForFn(fn)); } /** @@ -176,32 +173,11 @@ public class Combine { */ public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey( GlobalCombineFn<? super InputT, ?, OutputT> fn) { - return perKey(fn.<K>asKeyedFn(), displayDataForFn(fn)); - } - - /** - * Returns a {@link PerKey Combine.PerKey} {@code PTransform} that - * first groups its input {@code PCollection} of {@code KV}s by keys and - * windows, then invokes the given function on each of the key/values-lists - * pairs to produce a combined value, and then returns a - * {@code PCollection} of {@code KV}s mapping each distinct key to - * its combined value for each window. - * - * <p>Each output element is in the window by which its corresponding input - * was grouped, and has the timestamp of the end of that window. The output - * {@code PCollection} has the same - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * as the input. - * - * <p>See {@link PerKey Combine.PerKey} for more information. - */ - public static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) { return perKey(fn, displayDataForFn(fn)); } private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> perKey( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, + GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { return new PerKey<>(fn, fnDisplayData, false /*fewKeys*/); } @@ -211,7 +187,7 @@ public class Combine { * in {@link GroupByKey}. */ private static <K, InputT, OutputT> PerKey<K, InputT, OutputT> fewKeys( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, + GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { return new PerKey<>(fn, fnDisplayData, true /*fewKeys*/); } @@ -239,7 +215,7 @@ public class Combine { */ public static <K, V> GroupedValues<K, V, V> groupedValues( SerializableFunction<Iterable<V>, V> fn) { - return groupedValues(IterableCombineFn.of(fn).<K>asKeyedFn(), displayDataForFn(fn)); + return groupedValues(IterableCombineFn.of(fn), displayDataForFn(fn)); } /** @@ -265,37 +241,11 @@ public class Combine { */ public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues( GlobalCombineFn<? super InputT, ?, OutputT> fn) { - return groupedValues(fn.<K>asKeyedFn(), displayDataForFn(fn)); - } - - /** - * Returns a {@link GroupedValues Combine.GroupedValues} - * {@code PTransform} that takes a {@code PCollection} of - * {@code KV}s where a key maps to an {@code Iterable} of values, e.g., - * the result of a {@code GroupByKey}, then uses the given - * {@code KeyedCombineFn} to combine all the values associated with - * each key. The combining function is provided the key. The types - * of the input and output values can differ. - * - * <p>Each output element has the same timestamp and is in the same window - * as its corresponding input element, and the output - * {@code PCollection} has the same - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * associated with it as the input. - * - * <p>See {@link GroupedValues Combine.GroupedValues} for more information. - * - * <p>Note that {@link #perKey(CombineFnBase.PerKeyCombineFn)} is typically - * more convenient to use than {@link GroupByKey} followed by - * {@code groupedValues(...)}. - */ - public static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn) { return groupedValues(fn, displayDataForFn(fn)); } private static <K, InputT, OutputT> GroupedValues<K, InputT, OutputT> groupedValues( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, + GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { return new GroupedValues<>(fn, fnDisplayData); } @@ -471,81 +421,8 @@ public class Combine { public TypeDescriptor<OutputT> getOutputType() { return new TypeDescriptor<OutputT>(getClass()) {}; } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public <K> KeyedCombineFn<K, InputT, AccumT, OutputT> asKeyedFn() { - // The key, an object, is never even looked at. - return new KeyIgnoringCombineFn<>(this); - } - - private static class KeyIgnoringCombineFn<K, InputT, AccumT, OutputT> - extends KeyedCombineFn<K, InputT, AccumT, OutputT> - implements NameOverride { - - private final CombineFn<InputT, AccumT, OutputT> fn; - - private KeyIgnoringCombineFn(CombineFn<InputT, AccumT, OutputT> fn) { - this.fn = fn; - } - - @Override - public AccumT createAccumulator(K key) { - return fn.createAccumulator(); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT input) { - return fn.addInput(accumulator, input); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) { - return fn.mergeAccumulators(accumulators); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator) { - return fn.extractOutput(accumulator); - } - - @Override - public AccumT compact(K key, AccumT accumulator) { - return fn.compact(accumulator); - } - - @Override - public Coder<AccumT> getAccumulatorCoder( - CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder) - throws CannotProvideCoderException { - return fn.getAccumulatorCoder(registry, inputCoder); - } - - @Override - public Coder<OutputT> getDefaultOutputCoder( - CoderRegistry registry, Coder<K> keyCoder, Coder<InputT> inputCoder) - throws CannotProvideCoderException { - return fn.getDefaultOutputCoder(registry, inputCoder); - } - - @Override - public CombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) { - return fn; - } - - @Override - public void populateDisplayData(Builder builder) { - builder.delegate(fn); - } - - @Override - public String getNameOverride() { - return NameUtils.approximateSimpleName(fn); - } - } } - ///////////////////////////////////////////////////////////////////////////// /** @@ -621,7 +498,6 @@ public class Combine { public Coder<V> getDefaultOutputCoder(CoderRegistry registry, Coder<V> inputCoder) { return inputCoder; } - } /** @@ -1083,215 +959,6 @@ public class Combine { ///////////////////////////////////////////////////////////////////////////// - - /** - * A {@code KeyedCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine - * a collection of input values of type {@code InputT}, associated with - * a key of type {@code K}, into a single output value of type - * {@code OutputT}. It does this via one or more intermediate mutable - * accumulator values of type {@code AccumT}. - * - * <p>The overall process to combine a collection of input - * {@code InputT} values associated with an input {@code K} key into a - * single output {@code OutputT} value is as follows: - * - * <ol> - * - * <li> The input {@code InputT} values are partitioned into one or more - * batches. - * - * <li> For each batch, the {@link #createAccumulator} operation is - * invoked to create a fresh mutable accumulator value of type - * {@code AccumT}, initialized to represent the combination of zero - * values. - * - * <li> For each input {@code InputT} value in a batch, the - * {@link #addInput} operation is invoked to add the value to that - * batch's accumulator {@code AccumT} value. The accumulator may just - * record the new value (e.g., if {@code AccumT == List<InputT>}, or may do - * work to represent the combination more compactly. - * - * <li> The {@link #mergeAccumulators} operation is invoked to - * combine a collection of accumulator {@code AccumT} values into a - * single combined output accumulator {@code AccumT} value, once the - * merging accumulators have had all all the input values in their - * batches added to them. This operation is invoked repeatedly, - * until there is only one accumulator value left. - * - * <li> The {@link #extractOutput} operation is invoked on the final - * accumulator {@code AccumT} value to get the output {@code OutputT} value. - * - * </ol> - * - * <p>All of these operations are passed the {@code K} key that the - * values being combined are associated with. - * - * <p>For example: - * <pre> {@code - * public class ConcatFn - * extends KeyedCombineFn<String, Integer, ConcatFn.Accum, String> { - * public static class Accum { - * String s = ""; - * } - * public Accum createAccumulator(String key) { - * return new Accum(); - * } - * public Accum addInput(String key, Accum accum, Integer input) { - * accum.s += "+" + input; - * return accum; - * } - * public Accum mergeAccumulators(String key, Iterable<Accum> accums) { - * Accum merged = new Accum(); - * for (Accum accum : accums) { - * merged.s += accum.s; - * } - * return merged; - * } - * public String extractOutput(String key, Accum accum) { - * return key + accum.s; - * } - * } - * PCollection<KV<String, Integer>> pc = ...; - * PCollection<KV<String, String>> pc2 = pc.apply( - * Combine.perKey(new ConcatFn())); - * } </pre> - * - * <p>Keyed combining functions used by {@link Combine.PerKey}, - * {@link Combine.GroupedValues}, and {@code PTransforms} derived - * from them should be <i>associative</i> and <i>commutative</i>. - * Associativity is required because input values are first broken - * up into subgroups before being combined, and their intermediate - * results further combined, in an arbitrary tree structure. - * Commutativity is required because any order of the input values - * is ignored when breaking up input values into groups. - * - * @param <K> type of keys - * @param <InputT> type of input values - * @param <AccumT> type of mutable accumulator values - * @param <OutputT> type of output values - */ - public abstract static class KeyedCombineFn<K, InputT, AccumT, OutputT> - extends AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT> { - /** - * Returns a new, mutable accumulator value representing the accumulation of zero input values. - * - * @param key the key that all the accumulated values using the - * accumulator are associated with - */ - public abstract AccumT createAccumulator(K key); - - /** - * Adds the given input value to the given accumulator, returning the new accumulator value. - * - * <p>For efficiency, the input accumulator may be modified and returned. - * - * @param key the key that all the accumulated values using the - * accumulator are associated with - */ - public abstract AccumT addInput(K key, AccumT accumulator, InputT value); - - /** - * Returns an accumulator representing the accumulation of all the - * input values accumulated in the merging accumulators. - * - * <p>May modify any of the argument accumulators. May return a - * fresh accumulator, or may return one of the (modified) argument - * accumulators. - * - * @param key the key that all the accumulators are associated - * with - */ - public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators); - - /** - * Returns the output value that is the result of combining all - * the input values represented by the given accumulator. - * - * @param key the key that all the accumulated values using the - * accumulator are associated with - */ - public abstract OutputT extractOutput(K key, AccumT accumulator); - - /** - * Returns an accumulator that represents the same logical value as the - * input accumulator, but may have a more compact representation. - * - * <p>For most CombineFns this would be a no-op, but should be overridden - * by CombineFns that (for example) buffer up elements and combine - * them in batches. - * - * <p>For efficiency, the input accumulator may be modified and returned. - * - * <p>By default returns the original accumulator. - */ - public AccumT compact(K key, AccumT accumulator) { - return accumulator; - } - - @Override - public CombineFn<InputT, AccumT, OutputT> forKey(final K key, final Coder<K> keyCoder) { - return new CombineFn<InputT, AccumT, OutputT>() { - - @Override - public AccumT createAccumulator() { - return KeyedCombineFn.this.createAccumulator(key); - } - - @Override - public AccumT addInput(AccumT accumulator, InputT input) { - return KeyedCombineFn.this.addInput(key, accumulator, input); - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { - return KeyedCombineFn.this.mergeAccumulators(key, accumulators); - } - - @Override - public OutputT extractOutput(AccumT accumulator) { - return KeyedCombineFn.this.extractOutput(key, accumulator); - } - - @Override - public AccumT compact(AccumT accumulator) { - return KeyedCombineFn.this.compact(key, accumulator); - } - - @Override - public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder) - throws CannotProvideCoderException { - return KeyedCombineFn.this.getAccumulatorCoder(registry, keyCoder, inputCoder); - } - - @Override - public Coder<OutputT> getDefaultOutputCoder( - CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException { - return KeyedCombineFn.this.getDefaultOutputCoder(registry, keyCoder, inputCoder); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(KeyedCombineFn.this); - } - }; - } - - /** - * Applies this {@code KeyedCombineFn} to a key and a collection - * of input values to produce a combined output value. - * - * <p>Useful when testing the behavior of a {@code KeyedCombineFn} - * separately from a {@code Combine} transform. - */ - public OutputT apply(K key, Iterable<? extends InputT> inputs) { - AccumT accum = createAccumulator(key); - for (InputT input : inputs) { - accum = addInput(key, accum, input); - } - return extractOutput(key, accum); - } - } - //////////////////////////////////////////////////////////////////////////// /** @@ -1458,8 +1125,7 @@ public class Combine { .apply(WithKeys.<Void, InputT>of((Void) null)) .setCoder(KvCoder.of(VoidCoder.of(), input.getCoder())); - Combine.PerKey<Void, InputT, OutputT> combine = - Combine.fewKeys(fn.asKeyedFn(), fnDisplayData); + Combine.PerKey<Void, InputT, OutputT> combine = Combine.fewKeys(fn, fnDisplayData); if (!sideInputs.isEmpty()) { combine = combine.withSideInputs(sideInputs); } @@ -1788,13 +1454,13 @@ public class Combine { public static class PerKey<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> { - private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn; + private final GlobalCombineFn<? super InputT, ?, OutputT> fn; private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final boolean fewKeys; private final List<PCollectionView<?>> sideInputs; private PerKey( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, + GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys) { this.fn = fn; this.fnDisplayData = fnDisplayData; @@ -1803,7 +1469,7 @@ public class Combine { } private PerKey( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, + GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, boolean fewKeys, List<PCollectionView<?>> sideInputs) { this.fn = fn; @@ -1819,7 +1485,7 @@ public class Combine { /** * Returns a {@link PTransform} identical to this, but with the specified side inputs to use - * in {@link KeyedCombineFnWithContext}. + * in {@link CombineFnWithContext}. */ public PerKey<K, InputT, OutputT> withSideInputs(PCollectionView<?>... sideInputs) { return withSideInputs(Arrays.asList(sideInputs)); @@ -1827,7 +1493,7 @@ public class Combine { /** * Returns a {@link PTransform} identical to this, but with the specified side inputs to use - * in {@link KeyedCombineFnWithContext}. + * in {@link CombineFnWithContext}. */ public PerKey<K, InputT, OutputT> withSideInputs( Iterable<? extends PCollectionView<?>> sideInputs) { @@ -1874,9 +1540,9 @@ public class Combine { } /** - * Returns the {@link PerKeyCombineFn} used by this Combine operation. + * Returns the {@link GlobalCombineFn} used by this Combine operation. */ - public PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> getFn() { + public GlobalCombineFn<? super InputT, ?, OutputT> getFn() { return fn; } @@ -1924,12 +1590,12 @@ public class Combine { public static class PerKeyWithHotKeyFanout<K, InputT, OutputT> extends PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> { - private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn; + private final GlobalCombineFn<? super InputT, ?, OutputT> fn; private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final SerializableFunction<? super K, Integer> hotKeyFanout; private PerKeyWithHotKeyFanout( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, + GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, SerializableFunction<? super K, Integer> hotKeyFanout) { this.fn = fn; @@ -1951,8 +1617,8 @@ public class Combine { // Name the accumulator type. @SuppressWarnings("unchecked") - final PerKeyCombineFn<K, InputT, AccumT, OutputT> typedFn = - (PerKeyCombineFn<K, InputT, AccumT, OutputT>) this.fn; + final GlobalCombineFn<InputT, AccumT, OutputT> typedFn = + (GlobalCombineFn<InputT, AccumT, OutputT>) this.fn; if (!(input.getCoder() instanceof KvCoder)) { throw new IllegalStateException( @@ -1966,7 +1632,7 @@ public class Combine { try { accumCoder = typedFn.getAccumulatorCoder( input.getPipeline().getCoderRegistry(), - inputCoder.getKeyCoder(), inputCoder.getValueCoder()); + inputCoder.getValueCoder()); } catch (CannotProvideCoderException e) { throw new IllegalStateException("Unable to determine accumulator coder.", e); } @@ -1979,38 +1645,37 @@ public class Combine { // set of values, then drop the nonce and do a final combine of the // aggregates. We do this by splitting the original CombineFn into two, // on that does addInput + merge and another that does merge + extract. - PerKeyCombineFn<KV<K, Integer>, InputT, AccumT, AccumT> hotPreCombine; - PerKeyCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine; - if (typedFn instanceof KeyedCombineFn) { - final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedFn = - (KeyedCombineFn<K, InputT, AccumT, OutputT>) typedFn; + GlobalCombineFn<InputT, AccumT, AccumT> hotPreCombine; + GlobalCombineFn<InputOrAccum<InputT, AccumT>, AccumT, OutputT> postCombine; + if (typedFn instanceof CombineFn) { + final CombineFn<InputT, AccumT, OutputT> fn = + (CombineFn<InputT, AccumT, OutputT>) typedFn; hotPreCombine = - new KeyedCombineFn<KV<K, Integer>, InputT, AccumT, AccumT>() { + new CombineFn<InputT, AccumT, AccumT>() { @Override - public AccumT createAccumulator(KV<K, Integer> key) { - return keyedFn.createAccumulator(key.getKey()); + public AccumT createAccumulator() { + return fn.createAccumulator(); } @Override - public AccumT addInput(KV<K, Integer> key, AccumT accumulator, InputT value) { - return keyedFn.addInput(key.getKey(), accumulator, value); + public AccumT addInput(AccumT accumulator, InputT value) { + return fn.addInput(accumulator, value); } @Override - public AccumT mergeAccumulators( - KV<K, Integer> key, Iterable<AccumT> accumulators) { - return keyedFn.mergeAccumulators(key.getKey(), accumulators); + public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { + return fn.mergeAccumulators(accumulators); } @Override - public AccumT compact(KV<K, Integer> key, AccumT accumulator) { - return keyedFn.compact(key.getKey(), accumulator); + public AccumT compact(AccumT accumulator) { + return fn.compact(accumulator); } @Override - public AccumT extractOutput(KV<K, Integer> key, AccumT accumulator) { + public AccumT extractOutput(AccumT accumulator) { return accumulator; } @Override @SuppressWarnings("unchecked") public Coder<AccumT> getAccumulatorCoder( - CoderRegistry registry, Coder<KV<K, Integer>> keyCoder, Coder<InputT> inputCoder) + CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException { return accumCoder; } @@ -2020,142 +1685,147 @@ public class Combine { builder.delegate(PerKeyWithHotKeyFanout.this); } }; + postCombine = - new KeyedCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() { + new CombineFn<InputOrAccum<InputT, AccumT>, AccumT, OutputT>() { @Override - public AccumT createAccumulator(K key) { - return keyedFn.createAccumulator(key); + public AccumT createAccumulator() { + return fn.createAccumulator(); } + @Override - public AccumT addInput( - K key, AccumT accumulator, InputOrAccum<InputT, AccumT> value) { + public AccumT addInput(AccumT accumulator, InputOrAccum<InputT, AccumT> value) { if (value.accum == null) { - return keyedFn.addInput(key, accumulator, value.input); + return fn.addInput(accumulator, value.input); } else { - return keyedFn.mergeAccumulators(key, ImmutableList.of(accumulator, value.accum)); + return fn.mergeAccumulators(ImmutableList.of(accumulator, value.accum)); } } + @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) { - return keyedFn.mergeAccumulators(key, accumulators); + public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { + return fn.mergeAccumulators(accumulators); } + @Override - public AccumT compact(K key, AccumT accumulator) { - return keyedFn.compact(key, accumulator); + public AccumT compact(AccumT accumulator) { + return fn.compact(accumulator); } + @Override - public OutputT extractOutput(K key, AccumT accumulator) { - return keyedFn.extractOutput(key, accumulator); + public OutputT extractOutput(AccumT accumulator) { + return fn.extractOutput(accumulator); } + @Override public Coder<OutputT> getDefaultOutputCoder( - CoderRegistry registry, - Coder<K> keyCoder, - Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder) + CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder) throws CannotProvideCoderException { - return keyedFn.getDefaultOutputCoder( - registry, keyCoder, inputCoder.getValueCoder()); + return fn.getDefaultOutputCoder(registry, inputCoder.getValueCoder()); } @Override - public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputOrAccum<InputT, AccumT>> inputCoder) - throws CannotProvideCoderException { + public Coder<AccumT> getAccumulatorCoder( + CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder) + throws CannotProvideCoderException { return accumCoder; } + @Override public void populateDisplayData(DisplayData.Builder builder) { builder.delegate(PerKeyWithHotKeyFanout.this); } }; - } else if (typedFn instanceof KeyedCombineFnWithContext) { - final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext = - (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) typedFn; + } else if (typedFn instanceof CombineFnWithContext) { + final CombineFnWithContext<InputT, AccumT, OutputT> fnWithContext = + (CombineFnWithContext<InputT, AccumT, OutputT>) typedFn; hotPreCombine = - new KeyedCombineFnWithContext<KV<K, Integer>, InputT, AccumT, AccumT>() { + new CombineFnWithContext<InputT, AccumT, AccumT>() { @Override - public AccumT createAccumulator(KV<K, Integer> key, Context c) { - return keyedFnWithContext.createAccumulator(key.getKey(), c); + public AccumT createAccumulator(Context c) { + return fnWithContext.createAccumulator(c); } @Override - public AccumT addInput( - KV<K, Integer> key, AccumT accumulator, InputT value, Context c) { - return keyedFnWithContext.addInput(key.getKey(), accumulator, value, c); + public AccumT addInput(AccumT accumulator, InputT value, Context c) { + return fnWithContext.addInput(accumulator, value, c); } @Override - public AccumT mergeAccumulators( - KV<K, Integer> key, Iterable<AccumT> accumulators, Context c) { - return keyedFnWithContext.mergeAccumulators(key.getKey(), accumulators, c); + public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) { + return fnWithContext.mergeAccumulators(accumulators, c); } @Override - public AccumT compact(KV<K, Integer> key, AccumT accumulator, Context c) { - return keyedFnWithContext.compact(key.getKey(), accumulator, c); + public AccumT compact(AccumT accumulator, Context c) { + return fnWithContext.compact(accumulator, c); } @Override - public AccumT extractOutput(KV<K, Integer> key, AccumT accumulator, Context c) { + public AccumT extractOutput(AccumT accumulator, Context c) { return accumulator; } @Override @SuppressWarnings("unchecked") public Coder<AccumT> getAccumulatorCoder( - CoderRegistry registry, Coder<KV<K, Integer>> keyCoder, Coder<InputT> inputCoder) + CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException { return accumCoder; } + @Override public void populateDisplayData(DisplayData.Builder builder) { builder.delegate(PerKeyWithHotKeyFanout.this); } }; postCombine = - new KeyedCombineFnWithContext<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() { + new CombineFnWithContext<InputOrAccum<InputT, AccumT>, AccumT, OutputT>() { @Override - public AccumT createAccumulator(K key, Context c) { - return keyedFnWithContext.createAccumulator(key, c); + public AccumT createAccumulator(Context c) { + return fnWithContext.createAccumulator(c); } + @Override public AccumT addInput( - K key, AccumT accumulator, InputOrAccum<InputT, AccumT> value, Context c) { + AccumT accumulator, InputOrAccum<InputT, AccumT> value, Context c) { if (value.accum == null) { - return keyedFnWithContext.addInput(key, accumulator, value.input, c); + return fnWithContext.addInput(accumulator, value.input, c); } else { - return keyedFnWithContext.mergeAccumulators( - key, ImmutableList.of(accumulator, value.accum), c); + return fnWithContext.mergeAccumulators( + ImmutableList.of(accumulator, value.accum), c); } } + @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) { - return keyedFnWithContext.mergeAccumulators(key, accumulators, c); + public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) { + return fnWithContext.mergeAccumulators(accumulators, c); } + @Override - public AccumT compact(K key, AccumT accumulator, Context c) { - return keyedFnWithContext.compact(key, accumulator, c); + public AccumT compact(AccumT accumulator, Context c) { + return fnWithContext.compact(accumulator, c); } + @Override - public OutputT extractOutput(K key, AccumT accumulator, Context c) { - return keyedFnWithContext.extractOutput(key, accumulator, c); + public OutputT extractOutput(AccumT accumulator, Context c) { + return fnWithContext.extractOutput(accumulator, c); } + @Override public Coder<OutputT> getDefaultOutputCoder( - CoderRegistry registry, - Coder<K> keyCoder, - Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder) + CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> accumulatorCoder) throws CannotProvideCoderException { - return keyedFnWithContext.getDefaultOutputCoder( - registry, keyCoder, inputCoder.getValueCoder()); + return fnWithContext.getDefaultOutputCoder(registry, inputCoder.getValueCoder()); } @Override - public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputOrAccum<InputT, AccumT>> inputCoder) + public Coder<AccumT> getAccumulatorCoder( + CoderRegistry registry, Coder<InputOrAccum<InputT, AccumT>> inputCoder) throws CannotProvideCoderException { return accumCoder; } + @Override public void populateDisplayData(DisplayData.Builder builder) { builder.delegate(PerKeyWithHotKeyFanout.this); @@ -2202,25 +1872,33 @@ public class Combine { } // Combine the hot and cold keys separately. - PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot = split - .get(hot) - .setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()), - inputCoder.getValueCoder())) - .setWindowingStrategyInternal(preCombineStrategy) - .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData)) - .apply("StripNonce", MapElements.via( - new SimpleFunction<KV<KV<K, Integer>, AccumT>, - KV<K, InputOrAccum<InputT, AccumT>>>() { - @Override - public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<KV<K, Integer>, AccumT> elem) { - return KV.of( - elem.getKey().getKey(), - InputOrAccum.<InputT, AccumT>accum(elem.getValue())); - } - })) - .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder)) - .apply(Window.<KV<K, InputOrAccum<InputT, AccumT>>>remerge()) - .setWindowingStrategyInternal(input.getWindowingStrategy()); + PCollection<KV<K, InputOrAccum<InputT, AccumT>>> precombinedHot = + split + .get(hot) + .setCoder( + KvCoder.of( + KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()), + inputCoder.getValueCoder())) + .setWindowingStrategyInternal(preCombineStrategy) + .apply( + "PreCombineHot", + Combine.<KV<K, Integer>, InputT, AccumT>perKey(hotPreCombine, fnDisplayData)) + .apply( + "StripNonce", + MapElements.via( + new SimpleFunction< + KV<KV<K, Integer>, AccumT>, KV<K, InputOrAccum<InputT, AccumT>>>() { + @Override + public KV<K, InputOrAccum<InputT, AccumT>> apply( + KV<KV<K, Integer>, AccumT> elem) { + return KV.of( + elem.getKey().getKey(), + InputOrAccum.<InputT, AccumT>accum(elem.getValue())); + } + })) + .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder)) + .apply(Window.<KV<K, InputOrAccum<InputT, AccumT>>>remerge()) + .setWindowingStrategyInternal(input.getWindowingStrategy()); PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold = split .get(cold) .setCoder(inputCoder) @@ -2235,9 +1913,12 @@ public class Combine { .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder)); // Combine the union of the pre-processed hot and cold key results. - return PCollectionList.of(precombinedHot).and(preprocessedCold) + return PCollectionList.of(precombinedHot) + .and(preprocessedCold) .apply(Flatten.<KV<K, InputOrAccum<InputT, AccumT>>>pCollections()) - .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData)); + .apply( + "PostCombine", + Combine.<K, InputOrAccum<InputT, AccumT>, OutputT>perKey(postCombine, fnDisplayData)); } @Override @@ -2325,71 +2006,61 @@ public class Combine { ///////////////////////////////////////////////////////////////////////////// /** - * {@code GroupedValues<K, InputT, OutputT>} takes a - * {@code PCollection<KV<K, Iterable<InputT>>>}, such as the result of - * {@link GroupByKey}, applies a specified - * {@link KeyedCombineFn KeyedCombineFn<K, InputT, AccumT, OutputT>} - * to each of the input {@code KV<K, Iterable<InputT>>} elements to - * produce a combined output {@code KV<K, OutputT>} element, and returns a - * {@code PCollection<KV<K, OutputT>>} containing all the combined output - * elements. It is common for {@code InputT == OutputT}, but not required. - * Common combining functions include sums, mins, maxes, and averages - * of numbers, conjunctions and disjunctions of booleans, statistical - * aggregations, etc. + * {@code GroupedValues<K, InputT, OutputT>} takes a {@code PCollection<KV<K, Iterable<InputT>>>}, + * such as the result of {@link GroupByKey}, applies a specified {@link CombineFn + * CombineFn<InputT, AccumT, OutputT>} to each of the input {@code KV<K, + * Iterable<InputT>>} elements to produce a combined output {@code KV<K, OutputT>} element, and + * returns a {@code PCollection<KV<K, OutputT>>} containing all the combined output elements. It + * is common for {@code InputT == OutputT}, but not required. Common combining functions include + * sums, mins, maxes, and averages of numbers, conjunctions and disjunctions of booleans, + * statistical aggregations, etc. * * <p>Example of use: - * <pre> {@code + * + * <pre>{@code * PCollection<KV<String, Integer>> pc = ...; * PCollection<KV<String, Iterable<Integer>>> groupedByKey = pc.apply( * new GroupByKey<String, Integer>()); * PCollection<KV<String, Integer>> sumByKey = groupedByKey.apply( * Combine.<String, Integer>groupedValues( * new Sum.SumIntegerFn())); - * } </pre> + * } + * </pre> * - * <p>See also {@link #perKey}/{@link PerKey Combine.PerKey}, which - * captures the common pattern of "combining by key" in a - * single easy-to-use {@code PTransform}. + * <p>See also {@link #perKey}/{@link PerKey Combine.PerKey}, which captures the common pattern of + * "combining by key" in a single easy-to-use {@code PTransform}. * - * <p>Combining for different keys can happen in parallel. Moreover, - * combining of the {@code Iterable<InputT>} values associated a single - * key can happen in parallel, with different subsets of the values - * being combined separately, and their intermediate results combined - * further, in an arbitrary tree reduction pattern, until a single - * result value is produced for each key. + * <p>Combining for different keys can happen in parallel. Moreover, combining of the {@code + * Iterable<InputT>} values associated a single key can happen in parallel, with different subsets + * of the values being combined separately, and their intermediate results combined further, in an + * arbitrary tree reduction pattern, until a single result value is produced for each key. * - * <p>By default, the {@code Coder} of the keys of the output - * {@code PCollection<KV<K, OutputT>>} is that of the keys of the input - * {@code PCollection<KV<K, InputT>>}, and the {@code Coder} of the values - * of the output {@code PCollection<KV<K, OutputT>>} is inferred from the - * concrete type of the {@code KeyedCombineFn<K, InputT, AccumT, OutputT>}'s output - * type {@code OutputT}. + * <p>By default, the {@code Coder} of the keys of the output {@code PCollection<KV<K, OutputT>>} + * is that of the keys of the input {@code PCollection<KV<K, InputT>>}, and the {@code Coder} of + * the values of the output {@code PCollection<KV<K, OutputT>>} is inferred from the concrete type + * of the {@code CombineFn<InputT, AccumT, OutputT>}'s output type {@code OutputT}. * - * <p>Each output element has the same timestamp and is in the same window - * as its corresponding input element, and the output - * {@code PCollection} has the same - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} - * associated with it as the input. + * <p>Each output element has the same timestamp and is in the same window as its corresponding + * input element, and the output {@code PCollection} has the same {@link + * org.apache.beam.sdk.transforms.windowing.WindowFn} associated with it as the input. * - * <p>See also {@link #globally}/{@link Globally Combine.Globally}, which - * combines all the values in a {@code PCollection} into a - * single value in a {@code PCollection}. + * <p>See also {@link #globally}/{@link Globally Combine.Globally}, which combines all the values + * in a {@code PCollection} into a single value in a {@code PCollection}. * * @param <K> type of input and output keys * @param <InputT> type of input values * @param <OutputT> type of output values */ public static class GroupedValues<K, InputT, OutputT> - extends PTransform - <PCollection<? extends KV<K, ? extends Iterable<InputT>>>, - PCollection<KV<K, OutputT>>> { + extends PTransform< + PCollection<? extends KV<K, ? extends Iterable<InputT>>>, PCollection<KV<K, OutputT>>> { - private final PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn; + private final GlobalCombineFn<? super InputT, ?, OutputT> fn; private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData; private final List<PCollectionView<?>> sideInputs; private GroupedValues( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, + GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) { this.fn = SerializableUtils.clone(fn); this.fnDisplayData = fnDisplayData; @@ -2397,7 +2068,7 @@ public class Combine { } private GroupedValues( - PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> fn, + GlobalCombineFn<? super InputT, ?, OutputT> fn, DisplayData.ItemSpec<? extends Class<?>> fnDisplayData, List<PCollectionView<?>> sideInputs) { this.fn = SerializableUtils.clone(fn); @@ -2415,9 +2086,9 @@ public class Combine { } /** - * Returns the KeyedCombineFn used by this Combine operation. + * Returns the {@link GlobalCombineFn} used by this Combine operation. */ - public PerKeyCombineFn<? super K, ? super InputT, ?, OutputT> getFn() { + public GlobalCombineFn<? super InputT, ?, OutputT> getFn() { return fn; } @@ -2436,9 +2107,9 @@ public class Combine { K key = c.element().getKey(); OutputT output; - if (fn instanceof KeyedCombineFnWithContext) { - output = ((KeyedCombineFnWithContext<? super K, ? super InputT, ?, OutputT>) fn) - .apply(key, c.element().getValue(), new CombineWithContext.Context() { + if (fn instanceof CombineFnWithContext) { + output = ((CombineFnWithContext<? super InputT, ?, OutputT>) fn) + .apply(c.element().getValue(), new CombineWithContext.Context() { @Override public PipelineOptions getPipelineOptions() { return c.getPipelineOptions(); @@ -2449,9 +2120,9 @@ public class Combine { return c.sideInput(view); } }); - } else if (fn instanceof KeyedCombineFn) { - output = ((KeyedCombineFn<? super K, ? super InputT, ?, OutputT>) fn) - .apply(key, c.element().getValue()); + } else if (fn instanceof CombineFn) { + output = ((CombineFn<? super InputT, ?, OutputT>) fn) + .apply(c.element().getValue()); } else { throw new IllegalStateException( String.format("Unknown type of CombineFn: %s", fn.getClass())); @@ -2516,10 +2187,9 @@ public class Combine { KvCoder<K, InputT> kvCoder = getKvCoder(input.getCoder()); @SuppressWarnings("unchecked") Coder<OutputT> outputValueCoder = - ((PerKeyCombineFn<K, InputT, ?, OutputT>) fn) - .getDefaultOutputCoder( - input.getPipeline().getCoderRegistry(), - kvCoder.getKeyCoder(), kvCoder.getValueCoder()); + ((GlobalCombineFn<InputT, ?, OutputT>) fn) + .getDefaultOutputCoder( + input.getPipeline().getCoderRegistry(), kvCoder.getValueCoder()); return KvCoder.of(kvCoder.getKeyCoder(), outputValueCoder); }
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java index 770a390..a881099 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFnBase.java @@ -25,9 +25,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.values.TypeDescriptor; @@ -92,65 +90,6 @@ public class CombineFnBase { */ OutputT defaultValue(); - /** - * Converts this {@code GloballyCombineFn} into an equivalent - * {@link PerKeyCombineFn} that ignores the keys passed to it and - * combines the values according to this {@code GloballyCombineFn}. - * - * @param <K> the type of the (ignored) keys - */ - <K> PerKeyCombineFn<K, InputT, AccumT, OutputT> asKeyedFn(); - } - - /** - * A {@code PerKeyCombineFn<K, InputT, AccumT, OutputT>} specifies how to combine - * a collection of input values of type {@code InputT}, associated with - * a key of type {@code K}, into a single output value of type - * {@code OutputT}. It does this via one or more intermediate mutable - * accumulator values of type {@code AccumT}. - * - * <p>Do not implement this interface directly. - * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead. - * - * @param <K> type of keys - * @param <InputT> type of input values - * @param <AccumT> type of mutable accumulator values - * @param <OutputT> type of output values - */ - public interface PerKeyCombineFn<K, InputT, AccumT, OutputT> - extends Serializable, HasDisplayData { - /** - * Returns the {@code Coder} to use for accumulator {@code AccumT} - * values, or null if it is not able to be inferred. - * - * <p>By default, uses the knowledge of the {@code Coder} being - * used for {@code K} keys and input {@code InputT} values and the - * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to - * infer the Coder for {@code AccumT} values. - * - * <p>This is the Coder used to send data through a communication-intensive - * shuffle step, so a compact and efficient representation may have - * significant performance benefits. - */ - Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException; - - /** - * Returns the {@code Coder} to use by default for output - * {@code OutputT} values, or null if it is not able to be inferred. - * - * <p>By default, uses the knowledge of the {@code Coder} being - * used for {@code K} keys and input {@code InputT} values and the - * enclosing {@code Pipeline}'s {@code CoderRegistry} to try to - * infer the Coder for {@code OutputT} values. - */ - Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException; - - /** - * Returns the a regular {@link GlobalCombineFn} that operates on a specific key. - */ - GlobalCombineFn<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder); } /** @@ -228,79 +167,4 @@ public class CombineFnBase { public void populateDisplayData(DisplayData.Builder builder) { } } - - /** - * An abstract {@link PerKeyCombineFn} base class shared by - * {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext}. - * - * <p>Do not extends this class directly. - * Extends {@link KeyedCombineFn} and {@link KeyedCombineFnWithContext} instead. - * - * @param <K> type of keys - * @param <InputT> type of input values - * @param <AccumT> type of mutable accumulator values - * @param <OutputT> type of output values - */ - abstract static class AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT> - implements PerKeyCombineFn<K, InputT, AccumT, OutputT> { - @Override - public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException { - return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class, - ImmutableMap.<Type, Coder<?>>of( - getKTypeVariable(), keyCoder, getInputTVariable(), inputCoder), - getAccumTVariable()); - } - - @Override - public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException { - return registry.getDefaultCoder(getClass(), AbstractPerKeyCombineFn.class, - ImmutableMap.<Type, Coder<?>>of(getKTypeVariable(), keyCoder, getInputTVariable(), - inputCoder, getAccumTVariable(), - this.getAccumulatorCoder(registry, keyCoder, inputCoder)), - getOutputTVariable()); - } - - /** - * Returns the {@link TypeVariable} of {@code K}. - */ - public TypeVariable<?> getKTypeVariable() { - return (TypeVariable<?>) new TypeDescriptor<K>(AbstractPerKeyCombineFn.class) {}.getType(); - } - - /** - * Returns the {@link TypeVariable} of {@code InputT}. - */ - public TypeVariable<?> getInputTVariable() { - return (TypeVariable<?>) - new TypeDescriptor<InputT>(AbstractPerKeyCombineFn.class) {}.getType(); - } - - /** - * Returns the {@link TypeVariable} of {@code AccumT}. - */ - public TypeVariable<?> getAccumTVariable() { - return (TypeVariable<?>) - new TypeDescriptor<AccumT>(AbstractPerKeyCombineFn.class) {}.getType(); - } - - /** - * Returns the {@link TypeVariable} of {@code OutputT}. - */ - public TypeVariable<?> getOutputTVariable() { - return (TypeVariable<?>) - new TypeDescriptor<OutputT>(AbstractPerKeyCombineFn.class) {}.getType(); - } - - /** - * {@inheritDoc} - * - * <p>By default, does not register any display data. Implementors may override this method - * to provide their own display data. - */ - @Override - public void populateDisplayData(DisplayData.Builder builder) { - } - } } http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index ca939c1..cc02dcf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -37,12 +37,9 @@ import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.CombineFnUtil; @@ -54,49 +51,6 @@ import org.apache.beam.sdk.values.TupleTag; public class CombineFns { /** - * Returns a {@link ComposeKeyedCombineFnBuilder} to construct a composed - * {@link PerKeyCombineFn}. - * - * <p>The same {@link TupleTag} cannot be used in a composition multiple times. - * - * <p>Example: - * <pre>{@code - * PCollection<KV<K, Integer>> latencies = ...; - * - * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>(); - * TupleTag<Double> meanLatencyTag = new TupleTag<Double>(); - * - * SimpleFunction<Integer, Integer> identityFn = - * new SimpleFunction<Integer, Integer>() { - * {@literal @}Override - * public Integer apply(Integer input) { - * return input; - * }}; - * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply( - * Combine.perKey( - * CombineFns.composeKeyed() - * .with(identityFn, new MaxIntegerFn(), maxLatencyTag) - * .with(identityFn, new MeanFn<Integer>(), meanLatencyTag))); - * - * PCollection<T> finalResultCollection = maxAndMean - * .apply(ParDo.of( - * new DoFn<KV<K, CoCombineResult>, T>() { - * {@literal @}ProcessElement - * public void processElement(ProcessContext c) throws Exception { - * KV<K, CoCombineResult> e = c.element(); - * Integer maxLatency = e.getValue().get(maxLatencyTag); - * Double meanLatency = e.getValue().get(meanLatencyTag); - * .... Do Something .... - * c.output(...some T...); - * } - * })); - * }</pre> - */ - public static ComposeKeyedCombineFnBuilder composeKeyed() { - return new ComposeKeyedCombineFnBuilder(); - } - - /** * Returns a {@link ComposeCombineFnBuilder} to construct a composed * {@link GlobalCombineFn}. * @@ -142,67 +96,6 @@ public class CombineFns { ///////////////////////////////////////////////////////////////////////////// /** - * A builder class to construct a composed {@link PerKeyCombineFn}. - */ - public static class ComposeKeyedCombineFnBuilder { - /** - * Returns a {@link ComposedKeyedCombineFn} that can take additional - * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function. - * - * <p>The {@link ComposedKeyedCombineFn} extracts inputs from {@code DataT} with - * the {@code extractInputFn} and combines them with the {@code keyedCombineFn}, - * and then it outputs each combined value with a {@link TupleTag} to a - * {@link CoCombineResult}. - */ - public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn, - TupleTag<OutputT> outputTag) { - return new ComposedKeyedCombineFn<DataT, K>() - .with(extractInputFn, keyedCombineFn, outputTag); - } - - /** - * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional - * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function. - * - * <p>The {@link ComposedKeyedCombineFnWithContext} extracts inputs from {@code DataT} with - * the {@code extractInputFn} and combines them with the {@code keyedCombineFnWithContext}, - * and then it outputs each combined value with a {@link TupleTag} to a - * {@link CoCombineResult}. - */ - public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFnWithContext, - TupleTag<OutputT> outputTag) { - return new ComposedKeyedCombineFnWithContext<DataT, K>() - .with(extractInputFn, keyedCombineFnWithContext, outputTag); - } - - /** - * Returns a {@link ComposedKeyedCombineFn} that can take additional - * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function. - */ - public <K, DataT, InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - CombineFn<InputT, ?, OutputT> combineFn, - TupleTag<OutputT> outputTag) { - return with(extractInputFn, combineFn.<K>asKeyedFn(), outputTag); - } - - /** - * Returns a {@link ComposedKeyedCombineFnWithContext} that can take additional - * {@link PerKeyCombineFn PerKeyCombineFns} and apply them as a single combine function. - */ - public <K, DataT, InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext, - TupleTag<OutputT> outputTag) { - return with(extractInputFn, combineFnWithContext.<K>asKeyedFn(), outputTag); - } - } - - /** * A builder class to construct a composed {@link GlobalCombineFn}. */ public static class ComposeCombineFnBuilder { @@ -246,7 +139,7 @@ public class CombineFns { /** * A tuple of outputs produced by a composed combine functions. * - * <p>See {@link #compose()} or {@link #composeKeyed()}) for details. + * <p>See {@link #compose()} for details. */ public static class CoCombineResult implements Serializable { @@ -598,345 +491,6 @@ public class CombineFns { } } - /** - * A composed {@link KeyedCombineFn} that applies multiple {@link KeyedCombineFn KeyedCombineFns}. - * - * <p>For each {@link KeyedCombineFn} it extracts inputs from {@code DataT} with - * the {@code extractInputFn} and combines them, - * and then it outputs each combined value with a {@link TupleTag} to a - * {@link CoCombineResult}. - */ - public static class ComposedKeyedCombineFn<DataT, K> - extends KeyedCombineFn<K, DataT, Object[], CoCombineResult> { - - private final List<SerializableFunction<DataT, Object>> extractInputFns; - private final List<KeyedCombineFn<K, Object, Object, Object>> keyedCombineFns; - private final List<TupleTag<?>> outputTags; - private final int combineFnCount; - - private ComposedKeyedCombineFn() { - this.extractInputFns = ImmutableList.of(); - this.keyedCombineFns = ImmutableList.of(); - this.outputTags = ImmutableList.of(); - this.combineFnCount = 0; - } - - private ComposedKeyedCombineFn( - ImmutableList<SerializableFunction<DataT, ?>> extractInputFns, - ImmutableList<KeyedCombineFn<K, ?, ?, ?>> keyedCombineFns, - ImmutableList<TupleTag<?>> outputTags) { - @SuppressWarnings({"unchecked", "rawtypes"}) - List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns; - this.extractInputFns = castedExtractInputFns; - - @SuppressWarnings({"unchecked", "rawtypes"}) - List<KeyedCombineFn<K, Object, Object, Object>> castedKeyedCombineFns = - (List) keyedCombineFns; - this.keyedCombineFns = castedKeyedCombineFns; - this.outputTags = outputTags; - this.combineFnCount = this.keyedCombineFns.size(); - } - - /** - * Returns a {@link ComposedKeyedCombineFn} with an additional {@link KeyedCombineFn}. - */ - public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - KeyedCombineFn<K, InputT, ?, OutputT> keyedCombineFn, - TupleTag<OutputT> outputTag) { - checkUniqueness(outputTags, outputTag); - return new ComposedKeyedCombineFn<>( - ImmutableList.<SerializableFunction<DataT, ?>>builder() - .addAll(extractInputFns) - .add(extractInputFn) - .build(), - ImmutableList.<KeyedCombineFn<K, ?, ?, ?>>builder() - .addAll(keyedCombineFns) - .add(keyedCombineFn) - .build(), - ImmutableList.<TupleTag<?>>builder() - .addAll(outputTags) - .add(outputTag) - .build()); - } - - /** - * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional - * {@link KeyedCombineFnWithContext}. - */ - public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - KeyedCombineFnWithContext<K, InputT, ?, OutputT> keyedCombineFn, - TupleTag<OutputT> outputTag) { - checkUniqueness(outputTags, outputTag); - List<KeyedCombineFnWithContext<K, Object, Object, Object>> fnsWithContext = - Lists.newArrayList(); - for (KeyedCombineFn<K, Object, Object, Object> fn : keyedCombineFns) { - fnsWithContext.add(CombineFnUtil.toFnWithContext(fn)); - } - return new ComposedKeyedCombineFnWithContext<>( - ImmutableList.<SerializableFunction<DataT, ?>>builder() - .addAll(extractInputFns) - .add(extractInputFn) - .build(), - ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder() - .addAll(fnsWithContext) - .add(keyedCombineFn) - .build(), - ImmutableList.<TupleTag<?>>builder() - .addAll(outputTags) - .add(outputTag) - .build()); - } - - /** - * Returns a {@link ComposedKeyedCombineFn} with an additional {@link CombineFn}. - */ - public <InputT, OutputT> ComposedKeyedCombineFn<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - CombineFn<InputT, ?, OutputT> keyedCombineFn, - TupleTag<OutputT> outputTag) { - return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag); - } - - /** - * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional - * {@link CombineFnWithContext}. - */ - public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - CombineFnWithContext<InputT, ?, OutputT> keyedCombineFn, - TupleTag<OutputT> outputTag) { - return with(extractInputFn, keyedCombineFn.<K>asKeyedFn(), outputTag); - } - - @Override - public Object[] createAccumulator(K key) { - Object[] accumsArray = new Object[combineFnCount]; - for (int i = 0; i < combineFnCount; ++i) { - accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key); - } - return accumsArray; - } - - @Override - public Object[] addInput(K key, Object[] accumulator, DataT value) { - for (int i = 0; i < combineFnCount; ++i) { - Object input = extractInputFns.get(i).apply(value); - accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input); - } - return accumulator; - } - - @Override - public Object[] mergeAccumulators(K key, final Iterable<Object[]> accumulators) { - Iterator<Object[]> iter = accumulators.iterator(); - if (!iter.hasNext()) { - return createAccumulator(key); - } else { - // Reuses the first accumulator, and overwrites its values. - // It is safe because {@code accum[i]} only depends on - // the i-th component of each accumulator. - Object[] accum = iter.next(); - for (int i = 0; i < combineFnCount; ++i) { - accum[i] = keyedCombineFns.get(i).mergeAccumulators( - key, new ProjectionIterable(accumulators, i)); - } - return accum; - } - } - - @Override - public CoCombineResult extractOutput(K key, Object[] accumulator) { - Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap(); - for (int i = 0; i < combineFnCount; ++i) { - valuesMap.put( - outputTags.get(i), - keyedCombineFns.get(i).extractOutput(key, accumulator[i])); - } - return new CoCombineResult(valuesMap); - } - - @Override - public Object[] compact(K key, Object[] accumulator) { - for (int i = 0; i < combineFnCount; ++i) { - accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i]); - } - return accumulator; - } - - @Override - public Coder<Object[]> getAccumulatorCoder( - CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder) - throws CannotProvideCoderException { - List<Coder<Object>> coders = Lists.newArrayList(); - for (int i = 0; i < combineFnCount; ++i) { - Coder<Object> inputCoder = - registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder); - coders.add(keyedCombineFns.get(i).getAccumulatorCoder(registry, keyCoder, inputCoder)); - } - return new ComposedAccumulatorCoder(coders); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - CombineFns.populateDisplayData(builder, keyedCombineFns); - } - } - - /** - * A composed {@link KeyedCombineFnWithContext} that applies multiple - * {@link KeyedCombineFnWithContext KeyedCombineFnWithContexts}. - * - * <p>For each {@link KeyedCombineFnWithContext} it extracts inputs from {@code DataT} with - * the {@code extractInputFn} and combines them, - * and then it outputs each combined value with a {@link TupleTag} to a - * {@link CoCombineResult}. - */ - public static class ComposedKeyedCombineFnWithContext<DataT, K> - extends KeyedCombineFnWithContext<K, DataT, Object[], CoCombineResult> { - - private final List<SerializableFunction<DataT, Object>> extractInputFns; - private final List<KeyedCombineFnWithContext<K, Object, Object, Object>> keyedCombineFns; - private final List<TupleTag<?>> outputTags; - private final int combineFnCount; - - private ComposedKeyedCombineFnWithContext() { - this.extractInputFns = ImmutableList.of(); - this.keyedCombineFns = ImmutableList.of(); - this.outputTags = ImmutableList.of(); - this.combineFnCount = 0; - } - - private ComposedKeyedCombineFnWithContext( - ImmutableList<SerializableFunction<DataT, ?>> extractInputFns, - ImmutableList<KeyedCombineFnWithContext<K, ?, ?, ?>> keyedCombineFns, - ImmutableList<TupleTag<?>> outputTags) { - @SuppressWarnings({"unchecked", "rawtypes"}) - List<SerializableFunction<DataT, Object>> castedExtractInputFns = - (List) extractInputFns; - this.extractInputFns = castedExtractInputFns; - - @SuppressWarnings({"unchecked", "rawtypes"}) - List<KeyedCombineFnWithContext<K, Object, Object, Object>> castedKeyedCombineFns = - (List) keyedCombineFns; - this.keyedCombineFns = castedKeyedCombineFns; - this.outputTags = outputTags; - this.combineFnCount = this.keyedCombineFns.size(); - } - - /** - * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional - * {@link PerKeyCombineFn}. - */ - public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - PerKeyCombineFn<K, InputT, ?, OutputT> perKeyCombineFn, - TupleTag<OutputT> outputTag) { - checkUniqueness(outputTags, outputTag); - return new ComposedKeyedCombineFnWithContext<>( - ImmutableList.<SerializableFunction<DataT, ?>>builder() - .addAll(extractInputFns) - .add(extractInputFn) - .build(), - ImmutableList.<KeyedCombineFnWithContext<K, ?, ?, ?>>builder() - .addAll(keyedCombineFns) - .add(CombineFnUtil.toFnWithContext(perKeyCombineFn)) - .build(), - ImmutableList.<TupleTag<?>>builder() - .addAll(outputTags) - .add(outputTag) - .build()); - } - - /** - * Returns a {@link ComposedKeyedCombineFnWithContext} with an additional - * {@link GlobalCombineFn}. - */ - public <InputT, OutputT> ComposedKeyedCombineFnWithContext<DataT, K> with( - SimpleFunction<DataT, InputT> extractInputFn, - GlobalCombineFn<InputT, ?, OutputT> perKeyCombineFn, - TupleTag<OutputT> outputTag) { - return with(extractInputFn, perKeyCombineFn.<K>asKeyedFn(), outputTag); - } - - @Override - public Object[] createAccumulator(K key, Context c) { - Object[] accumsArray = new Object[combineFnCount]; - for (int i = 0; i < combineFnCount; ++i) { - accumsArray[i] = keyedCombineFns.get(i).createAccumulator(key, c); - } - return accumsArray; - } - - @Override - public Object[] addInput(K key, Object[] accumulator, DataT value, Context c) { - for (int i = 0; i < combineFnCount; ++i) { - Object input = extractInputFns.get(i).apply(value); - accumulator[i] = keyedCombineFns.get(i).addInput(key, accumulator[i], input, c); - } - return accumulator; - } - - @Override - public Object[] mergeAccumulators(K key, Iterable<Object[]> accumulators, Context c) { - Iterator<Object[]> iter = accumulators.iterator(); - if (!iter.hasNext()) { - return createAccumulator(key, c); - } else { - // Reuses the first accumulator, and overwrites its values. - // It is safe because {@code accum[i]} only depends on - // the i-th component of each accumulator. - Object[] accum = iter.next(); - for (int i = 0; i < combineFnCount; ++i) { - accum[i] = keyedCombineFns.get(i).mergeAccumulators( - key, new ProjectionIterable(accumulators, i), c); - } - return accum; - } - } - - @Override - public CoCombineResult extractOutput(K key, Object[] accumulator, Context c) { - Map<TupleTag<?>, Object> valuesMap = Maps.newHashMap(); - for (int i = 0; i < combineFnCount; ++i) { - valuesMap.put( - outputTags.get(i), - keyedCombineFns.get(i).extractOutput(key, accumulator[i], c)); - } - return new CoCombineResult(valuesMap); - } - - @Override - public Object[] compact(K key, Object[] accumulator, Context c) { - for (int i = 0; i < combineFnCount; ++i) { - accumulator[i] = keyedCombineFns.get(i).compact(key, accumulator[i], c); - } - return accumulator; - } - - @Override - public Coder<Object[]> getAccumulatorCoder( - CoderRegistry registry, Coder<K> keyCoder, Coder<DataT> dataCoder) - throws CannotProvideCoderException { - List<Coder<Object>> coders = Lists.newArrayList(); - for (int i = 0; i < combineFnCount; ++i) { - Coder<Object> inputCoder = - registry.getDefaultOutputCoder(extractInputFns.get(i), dataCoder); - coders.add(keyedCombineFns.get(i).getAccumulatorCoder( - registry, keyCoder, inputCoder)); - } - return new ComposedAccumulatorCoder(coders); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); - CombineFns.populateDisplayData(builder, keyedCombineFns); - } - } - ///////////////////////////////////////////////////////////////////////////// private static class ProjectionIterable implements Iterable<Object> { http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java index cd0600a..9ae19f8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java @@ -17,20 +17,15 @@ */ package org.apache.beam.sdk.transforms; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollectionView; /** * This class contains combine functions that have access to {@code PipelineOptions} and side inputs * through {@code CombineWithContext.Context}. * - * <p>{@link CombineFnWithContext} and {@link KeyedCombineFnWithContext} are for users to extend. + * <p>{@link CombineFnWithContext} is for users to extend. */ public class CombineWithContext { @@ -116,170 +111,23 @@ public class CombineWithContext { return accumulator; } - @Override - public OutputT defaultValue() { - throw new UnsupportedOperationException( - "Override this function to provide the default value."); - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Override - public <K> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> asKeyedFn() { - // The key, an object, is never even looked at. - return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() { - @Override - public AccumT createAccumulator(K key, Context c) { - return CombineFnWithContext.this.createAccumulator(c); - } - - @Override - public AccumT addInput(K key, AccumT accumulator, InputT input, Context c) { - return CombineFnWithContext.this.addInput(accumulator, input, c); - } - - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) { - return CombineFnWithContext.this.mergeAccumulators(accumulators, c); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, Context c) { - return CombineFnWithContext.this.extractOutput(accumulator, c); - } - - @Override - public AccumT compact(K key, AccumT accumulator, Context c) { - return CombineFnWithContext.this.compact(accumulator, c); - } - - @Override - public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException { - return CombineFnWithContext.this.getAccumulatorCoder(registry, inputCoder); - } - - @Override - public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException { - return CombineFnWithContext.this.getDefaultOutputCoder(registry, inputCoder); - } - - @Override - public CombineFnWithContext<InputT, AccumT, OutputT> forKey(K key, Coder<K> keyCoder) { - return CombineFnWithContext.this; - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - builder.delegate(CombineFnWithContext.this); - } - }; - } - } - - /** - * A keyed combine function that has access to {@code PipelineOptions} and side inputs through - * {@code CombineWithContext.Context}. - * - * <p>See the equivalent {@link KeyedCombineFn} for details about keyed combine functions. - */ - public abstract static class KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> - extends CombineFnBase.AbstractPerKeyCombineFn<K, InputT, AccumT, OutputT> - implements RequiresContextInternal { - /** - * Returns a new, mutable accumulator value representing the accumulation of zero input values. - * - * <p>It is equivalent to {@link KeyedCombineFn#createAccumulator}, - * but it has additional access to {@code CombineWithContext.Context}. - */ - public abstract AccumT createAccumulator(K key, Context c); - /** - * Adds the given input value to the given accumulator, returning the new accumulator value. - * - * <p>It is equivalent to {@link KeyedCombineFn#addInput}, but it has additional access to - * {@code CombineWithContext.Context}. + * Applies this {@code CombineFnWithContext} to a collection of input values to produce a + * combined output value. */ - public abstract AccumT addInput(K key, AccumT accumulator, InputT value, Context c); - - /** - * Returns an accumulator representing the accumulation of all the - * input values accumulated in the merging accumulators. - * - * <p>It is equivalent to {@link KeyedCombineFn#mergeAccumulators}, - * but it has additional access to {@code CombineWithContext.Context}.. - */ - public abstract AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c); - - /** - * Returns the output value that is the result of combining all - * the input values represented by the given accumulator. - * - * <p>It is equivalent to {@link KeyedCombineFn#extractOutput}, but it has additional access to - * {@code CombineWithContext.Context}. - */ - public abstract OutputT extractOutput(K key, AccumT accumulator, Context c); - - /** - * Returns an accumulator that represents the same logical value as the - * input accumulator, but may have a more compact representation. - * - * <p>It is equivalent to {@link KeyedCombineFn#compact}, but it has additional access to - * {@code CombineWithContext.Context}. - */ - public AccumT compact(K key, AccumT accumulator, Context c) { - return accumulator; - } - - /** - * Applies this {@code KeyedCombineFnWithContext} to a key and a collection - * of input values to produce a combined output value. - */ - public OutputT apply(K key, Iterable<? extends InputT> inputs, Context c) { - AccumT accum = createAccumulator(key, c); + public OutputT apply(Iterable<? extends InputT> inputs, Context c) { + AccumT accum = createAccumulator(c); for (InputT input : inputs) { - accum = addInput(key, accum, input, c); + accum = addInput(accum, input, c); } - return extractOutput(key, accum, c); + return extractOutput(accum, c); } @Override - public CombineFnWithContext<InputT, AccumT, OutputT> forKey( - final K key, final Coder<K> keyCoder) { - return new CombineFnWithContext<InputT, AccumT, OutputT>() { - @Override - public AccumT createAccumulator(Context c) { - return KeyedCombineFnWithContext.this.createAccumulator(key, c); - } - - @Override - public AccumT addInput(AccumT accumulator, InputT input, Context c) { - return KeyedCombineFnWithContext.this.addInput(key, accumulator, input, c); - } - - @Override - public AccumT mergeAccumulators(Iterable<AccumT> accumulators, Context c) { - return KeyedCombineFnWithContext.this.mergeAccumulators(key, accumulators, c); - } - - @Override - public OutputT extractOutput(AccumT accumulator, Context c) { - return KeyedCombineFnWithContext.this.extractOutput(key, accumulator, c); - } - - @Override - public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder) - throws CannotProvideCoderException { - return KeyedCombineFnWithContext.this.getAccumulatorCoder(registry, keyCoder, inputCoder); - } - - @Override - public Coder<OutputT> getDefaultOutputCoder( - CoderRegistry registry, Coder<InputT> inputCoder) throws CannotProvideCoderException { - return KeyedCombineFnWithContext.this.getDefaultOutputCoder( - registry, keyCoder, inputCoder); - } - }; + public OutputT defaultValue() { + throw new UnsupportedOperationException( + "Override this function to provide the default value."); } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java index 47be9b9..e42c0b2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java @@ -234,7 +234,7 @@ public class Top { public static <K, V, ComparatorT extends Comparator<V> & Serializable> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>> perKey(int count, ComparatorT compareFn) { - return Combine.perKey(new TopCombineFn<>(count, compareFn).<K>asKeyedFn()); + return Combine.perKey(new TopCombineFn<>(count, compareFn)); } /** @@ -280,7 +280,7 @@ public class Top { public static <K, V extends Comparable<V>> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>> smallestPerKey(int count) { - return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>()).<K>asKeyedFn()); + return Combine.perKey(new TopCombineFn<>(count, new Smallest<V>())); } /** @@ -326,7 +326,7 @@ public class Top { public static <K, V extends Comparable<V>> PerKey<K, V, List<V>> largestPerKey(int count) { - return Combine.perKey(new TopCombineFn<>(count, new Largest<V>()).<K>asKeyedFn()); + return Combine.perKey(new TopCombineFn<>(count, new Largest<V>())); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java index 0495ad6..b3b8918 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java @@ -205,7 +205,7 @@ public class View { * PCollection<KV<K, V>> input = ... * CombineFn<V, OutputT> yourCombineFn = ... * PCollectionView<Map<K, OutputT>> output = input - * .apply(Combine.perKey(yourCombineFn.<K>asKeyedFn())) + * .apply(Combine.perKey(yourCombineFn)) * .apply(View.<K, OutputT>asMap()); * }</pre> *
