http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java index 30b302c..b16aadc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/AppliedCombineFn.java @@ -23,13 +23,12 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; import org.apache.beam.sdk.values.PCollectionView; /** - * A {@link KeyedCombineFnWithContext} with a fixed accumulator coder. This is created from a - * specific application of the {@link KeyedCombineFnWithContext}. + * A {@link GlobalCombineFn} with a fixed accumulator coder. This is created from a + * specific application of the {@link GlobalCombineFn}. * * <p>Because the {@code AccumT} may reference {@code InputT}, the specific {@code Coder<AccumT>} * may depend on the {@code Coder<InputT>}. @@ -41,14 +40,14 @@ import org.apache.beam.sdk.values.PCollectionView; */ public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializable { - private final PerKeyCombineFn<K, InputT, AccumT, OutputT> fn; + private final GlobalCombineFn<InputT, AccumT, OutputT> fn; private final Coder<AccumT> accumulatorCoder; private final Iterable<PCollectionView<?>> sideInputViews; private final KvCoder<K, InputT> kvCoder; private final WindowingStrategy<?, ?> windowingStrategy; - private AppliedCombineFn(PerKeyCombineFn<K, InputT, AccumT, OutputT> fn, + private AppliedCombineFn(GlobalCombineFn<InputT, AccumT, OutputT> fn, Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews, KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) { this.fn = fn; @@ -60,41 +59,41 @@ public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializabl public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> withAccumulatorCoder( - PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn, + GlobalCombineFn<? super InputT, AccumT, OutputT> fn, Coder<AccumT> accumCoder) { return withAccumulatorCoder(fn, accumCoder, null, null, null); } public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> withAccumulatorCoder( - PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn, + GlobalCombineFn<? super InputT, AccumT, OutputT> fn, Coder<AccumT> accumCoder, Iterable<PCollectionView<?>> sideInputViews, KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) { // Casting down the K and InputT is safe because they're only used as inputs. @SuppressWarnings("unchecked") - PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn = - (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn); + GlobalCombineFn<InputT, AccumT, OutputT> clonedFn = + (GlobalCombineFn<InputT, AccumT, OutputT>) SerializableUtils.clone(fn); return create(clonedFn, accumCoder, sideInputViews, kvCoder, windowingStrategy); } @VisibleForTesting public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> - withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn, + withInputCoder(GlobalCombineFn<? super InputT, AccumT, OutputT> fn, CoderRegistry registry, KvCoder<K, InputT> kvCoder) { return withInputCoder(fn, registry, kvCoder, null, null); } public static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> - withInputCoder(PerKeyCombineFn<? super K, ? super InputT, AccumT, OutputT> fn, + withInputCoder(GlobalCombineFn<? super InputT, AccumT, OutputT> fn, CoderRegistry registry, KvCoder<K, InputT> kvCoder, Iterable<PCollectionView<?>> sideInputViews, WindowingStrategy<?, ?> windowingStrategy) { // Casting down the K and InputT is safe because they're only used as inputs. @SuppressWarnings("unchecked") - PerKeyCombineFn<K, InputT, AccumT, OutputT> clonedFn = - (PerKeyCombineFn<K, InputT, AccumT, OutputT>) SerializableUtils.clone(fn); + GlobalCombineFn<InputT, AccumT, OutputT> clonedFn = + (GlobalCombineFn<InputT, AccumT, OutputT>) SerializableUtils.clone(fn); try { - Coder<AccumT> accumulatorCoder = clonedFn.getAccumulatorCoder( - registry, kvCoder.getKeyCoder(), kvCoder.getValueCoder()); + Coder<AccumT> accumulatorCoder = + clonedFn.getAccumulatorCoder(registry, kvCoder.getValueCoder()); return create(clonedFn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy); } catch (CannotProvideCoderException e) { throw new IllegalStateException("Could not determine coder for accumulator", e); @@ -102,14 +101,14 @@ public class AppliedCombineFn<K, InputT, AccumT, OutputT> implements Serializabl } private static <K, InputT, AccumT, OutputT> AppliedCombineFn<K, InputT, AccumT, OutputT> create( - PerKeyCombineFn<K, InputT, AccumT, OutputT> fn, + GlobalCombineFn<InputT, AccumT, OutputT> fn, Coder<AccumT> accumulatorCoder, Iterable<PCollectionView<?>> sideInputViews, KvCoder<K, InputT> kvCoder, WindowingStrategy<?, ?> windowingStrategy) { return new AppliedCombineFn<>( fn, accumulatorCoder, sideInputViews, kvCoder, windowingStrategy); } - public PerKeyCombineFn<K, InputT, AccumT, OutputT> getFn() { + public GlobalCombineFn<InputT, AccumT, OutputT> getFn() { return fn; }
http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java index a9a0178..a394180 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java @@ -24,12 +24,9 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.state.StateContext; @@ -37,20 +34,21 @@ import org.apache.beam.sdk.util.state.StateContext; * Static utility methods that create combine function instances. */ public class CombineFnUtil { + /** - * Returns the partial application of the {@link KeyedCombineFnWithContext} to a specific - * context to produce a {@link KeyedCombineFn}. + * Returns the partial application of the {@link CombineFnWithContext} to a specific context + * to produce a {@link CombineFn}. * - * <p>The returned {@link KeyedCombineFn} cannot be serialized. + * <p>The returned {@link CombineFn} cannot be serialized. */ - public static <K, InputT, AccumT, OutputT> KeyedCombineFn<K, InputT, AccumT, OutputT> - bindContext( - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn, + public static <K, InputT, AccumT, OutputT> CombineFn<InputT, AccumT, OutputT> bindContext( + CombineFnWithContext<InputT, AccumT, OutputT> combineFn, StateContext<?> stateContext) { Context context = CombineContextFactory.createFromStateContext(stateContext); - return new NonSerializableBoundedKeyedCombineFn<>(combineFn, context); + return new NonSerializableBoundedCombineFn<>(combineFn, context); } + /** * Return a {@link CombineFnWithContext} from the given {@link GlobalCombineFn}. */ @@ -110,100 +108,55 @@ public class CombineFnUtil { } } - /** - * Return a {@link KeyedCombineFnWithContext} from the given {@link PerKeyCombineFn}. - */ - public static <K, InputT, AccumT, OutputT> KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> - toFnWithContext(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) { - if (perKeyCombineFn instanceof KeyedCombineFnWithContext) { - @SuppressWarnings("unchecked") - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext = - (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn; - return keyedCombineFnWithContext; - } else { - @SuppressWarnings("unchecked") - final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn = - (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn; - return new KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>() { - @Override - public AccumT createAccumulator(K key, Context c) { - return keyedCombineFn.createAccumulator(key); - } - @Override - public AccumT addInput(K key, AccumT accumulator, InputT value, Context c) { - return keyedCombineFn.addInput(key, accumulator, value); - } - @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators, Context c) { - return keyedCombineFn.mergeAccumulators(key, accumulators); - } - @Override - public OutputT extractOutput(K key, AccumT accumulator, Context c) { - return keyedCombineFn.extractOutput(key, accumulator); - } - @Override - public AccumT compact(K key, AccumT accumulator, Context c) { - return keyedCombineFn.compact(key, accumulator); - } - @Override - public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException { - return keyedCombineFn.getAccumulatorCoder(registry, keyCoder, inputCoder); - } - @Override - public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException { - return keyedCombineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder); - } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - keyedCombineFn.populateDisplayData(builder); - } - }; - } - } - - private static class NonSerializableBoundedKeyedCombineFn<K, InputT, AccumT, OutputT> - extends KeyedCombineFn<K, InputT, AccumT, OutputT> { - private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn; + private static class NonSerializableBoundedCombineFn<InputT, AccumT, OutputT> + extends CombineFn<InputT, AccumT, OutputT> { + private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn; private final Context context; - private NonSerializableBoundedKeyedCombineFn( - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn, - Context context) { + private NonSerializableBoundedCombineFn( + CombineFnWithContext<InputT, AccumT, OutputT> combineFn, Context context) { this.combineFn = combineFn; this.context = context; } + @Override - public AccumT createAccumulator(K key) { - return combineFn.createAccumulator(key, context); + public AccumT createAccumulator() { + return combineFn.createAccumulator(context); } + @Override - public AccumT addInput(K key, AccumT accumulator, InputT value) { - return combineFn.addInput(key, accumulator, value, context); + public AccumT addInput(AccumT accumulator, InputT value) { + return combineFn.addInput(accumulator, value, context); } + @Override - public AccumT mergeAccumulators(K key, Iterable<AccumT> accumulators) { - return combineFn.mergeAccumulators(key, accumulators, context); + public AccumT mergeAccumulators(Iterable<AccumT> accumulators) { + return combineFn.mergeAccumulators(accumulators, context); } + @Override - public OutputT extractOutput(K key, AccumT accumulator) { - return combineFn.extractOutput(key, accumulator, context); + public OutputT extractOutput(AccumT accumulator) { + return combineFn.extractOutput(accumulator, context); } + @Override - public AccumT compact(K key, AccumT accumulator) { - return combineFn.compact(key, accumulator, context); + public AccumT compact(AccumT accumulator) { + return combineFn.compact(accumulator, context); } + @Override - public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException { - return combineFn.getAccumulatorCoder(registry, keyCoder, inputCoder); + public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry, Coder<InputT> inputCoder) + throws CannotProvideCoderException { + return combineFn.getAccumulatorCoder(registry, inputCoder); } + @Override - public Coder<OutputT> getDefaultOutputCoder(CoderRegistry registry, Coder<K> keyCoder, - Coder<InputT> inputCoder) throws CannotProvideCoderException { - return combineFn.getDefaultOutputCoder(registry, keyCoder, inputCoder); + public Coder<OutputT> getDefaultOutputCoder( + CoderRegistry registry, Coder<InputT> inputCoder) + throws CannotProvideCoderException { + return combineFn.getDefaultOutputCoder(registry, inputCoder); } + @Override public void populateDisplayData(DisplayData.Builder builder) { combineFn.populateDisplayData(builder); http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java index f9ab115..6fe37a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java @@ -45,20 +45,11 @@ public interface StateBinder<K> { Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn); - <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombining( - String id, - StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); - - <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext( - String id, - StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> - combineFn); + <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext( + String id, + StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn); /** * Bind to a watermark {@link StateSpec}. http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index 8fa5bb0..a057a0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -27,8 +27,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; @@ -77,52 +76,16 @@ public class StateSpecs { } /** - * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge + * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge * multiple {@code InputT}s into a single {@code OutputT}. */ - public static <K, InputT, AccumT, OutputT> - StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining( - KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(null, combineFn); - } - - /** - * Create a state spec for values that use a {@link KeyedCombineFn} to automatically merge - * multiple {@code InputT}s into a single {@code OutputT}. - */ - public static <K, InputT, AccumT, OutputT> - StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining( - Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { + public static <InputT, AccumT, OutputT> + StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining( + Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { checkArgument(accumCoder != null, "accumCoder should not be null. " - + "Consider using keyedCombining(KeyedCombineFn<> combineFn) instead."); - return keyedCombiningInternal(accumCoder, combineFn); - } - - /** - * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically - * merge multiple {@code InputT}s into a single {@code OutputT}. - */ - public static <K, InputT, AccumT, OutputT> - StateSpec<K, CombiningState<InputT, AccumT, OutputT>> - keyedCombiningWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { - return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn); - } - - /** - * Create a state spec for values that use a {@link KeyedCombineFnWithContext} to automatically - * merge multiple {@code InputT}s into a single {@code OutputT}. - */ - public static <K, InputT, AccumT, OutputT> - StateSpec<K, CombiningState<InputT, AccumT, OutputT>> - keyedCombiningWithContext( - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { - checkArgument(accumCoder != null, - "accumCoder should not be null. Consider using " - + "keyedCombiningWithContext(KeyedCombineFnWithContext<> combineFn) instead."); - return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>( - accumCoder, combineFn); + + "Consider using combining(CombineFn<> combineFn) instead."); + return combiningInternal(accumCoder, combineFn); } /** @@ -155,10 +118,10 @@ public class StateSpecs { return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); } - private static <K, InputT, AccumT, OutputT> - StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningInternal( - Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn); + private static <InputT, AccumT, OutputT> + StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal( + Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); } /** @@ -216,17 +179,17 @@ public class StateSpecs { public static <K, InputT, AccumT, OutputT> StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal( StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) { - if (combiningSpec instanceof KeyedCombiningStateSpec) { + if (combiningSpec instanceof CombiningStateSpec) { // Checked above; conversion to a bag spec depends on the provided spec being one of those // created via the factory methods in this class. @SuppressWarnings("unchecked") - KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> typedSpec = - (KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>) combiningSpec; + CombiningStateSpec<InputT, AccumT, OutputT> typedSpec = + (CombiningStateSpec<InputT, AccumT, OutputT>) combiningSpec; return typedSpec.asBagSpec(); - } else if (combiningSpec instanceof KeyedCombiningWithContextStateSpec) { + } else if (combiningSpec instanceof CombiningWithContextStateSpec) { @SuppressWarnings("unchecked") - KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec = - (KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec; + CombiningWithContextStateSpec<InputT, AccumT, OutputT> typedSpec = + (CombiningWithContextStateSpec<InputT, AccumT, OutputT>) combiningSpec; return typedSpec.asBagSpec(); } else { throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec); @@ -297,7 +260,6 @@ public class StateSpecs { * <p>Includes the {@link CombineFn} and the coder for the accumulator type. */ private static class CombiningStateSpec<InputT, AccumT, OutputT> - extends KeyedCombiningStateSpec<Object, InputT, AccumT, OutputT> implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> { @Nullable @@ -307,14 +269,14 @@ public class StateSpecs { private CombiningStateSpec( @Nullable Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - super(accumCoder, combineFn.asKeyedFn()); this.combineFn = combineFn; this.accumCoder = accumCoder; } @Override - protected Coder<AccumT> getAccumCoder() { - return accumCoder; + public CombiningState<InputT, AccumT, OutputT> bind( + String id, StateBinder<? extends Object> visitor) { + return visitor.bindCombining(id, this, accumCoder, combineFn); } @SuppressWarnings("unchecked") @@ -326,51 +288,14 @@ public class StateSpecs { } } } - } - - /** - * A specification for a state cell that is combined according to a - * {@link KeyedCombineFnWithContext}. - * - * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type. - */ - private static class KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> - implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> { - - @Nullable - private Coder<AccumT> accumCoder; - private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn; - - protected KeyedCombiningWithContextStateSpec( - @Nullable Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { - this.combineFn = combineFn; - this.accumCoder = accumCoder; - } - - @Override - public CombiningState<InputT, AccumT, OutputT> bind( - String id, StateBinder<? extends K> visitor) { - return visitor.bindKeyedCombiningWithContext(id, this, accumCoder, combineFn); - } - - @SuppressWarnings("unchecked") - @Override - public void offerCoders(Coder[] coders) { - if (this.accumCoder == null) { - if (coders[2] != null) { - this.accumCoder = (Coder<AccumT>) coders[2]; - } - } - } @Override public void finishSpecifying() { if (accumCoder == null) { throw new IllegalStateException("Unable to infer a coder for" - + " KeyedCombiningWithContextState and no Coder was specified." + + " CombiningState and no Coder was specified." + " Please set a coder by either invoking" - + " StateSpecs.keyedCombining(Coder<AccumT> accumCoder," - + " KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn)" + + " StateSpecs.combining(Coder<AccumT> accumCoder," + + " CombineFn<InputT, AccumT, OutputT> combineFn)" + " or by registering the coder in the Pipeline's CoderRegistry."); } } @@ -381,12 +306,12 @@ public class StateSpecs { return true; } - if (!(obj instanceof KeyedCombiningWithContextStateSpec)) { + if (!(obj instanceof CombiningStateSpec)) { return false; } - KeyedCombiningWithContextStateSpec<?, ?, ?, ?> that = - (KeyedCombiningWithContextStateSpec<?, ?, ?, ?>) obj; + CombiningStateSpec<?, ?, ?> that = + (CombiningStateSpec<?, ?, ?>) obj; return Objects.equals(this.accumCoder, that.accumCoder); } @@ -401,32 +326,28 @@ public class StateSpecs { } /** - * A specification for a state cell that is combined according to a {@link KeyedCombineFn}. + * A specification for a state cell that is combined according to a {@link + * CombineFnWithContext}. * - * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type. + * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type. */ - private static class KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> - implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> { + private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT> + implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> { - @Nullable - private Coder<AccumT> accumCoder; - private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; + @Nullable private Coder<AccumT> accumCoder; + private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn; - protected KeyedCombiningStateSpec( + private CombiningWithContextStateSpec( @Nullable Coder<AccumT> accumCoder, - KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + this.combineFn = combineFn; this.accumCoder = accumCoder; } - protected Coder<AccumT> getAccumCoder() { - return accumCoder; - } - @Override public CombiningState<InputT, AccumT, OutputT> bind( - String id, StateBinder<? extends K> visitor) { - return visitor.bindKeyedCombining(id, this, getAccumCoder(), keyedCombineFn); + String id, StateBinder<? extends Object> visitor) { + return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn); } @SuppressWarnings("unchecked") @@ -439,13 +360,16 @@ public class StateSpecs { } } - @Override public void finishSpecifying() { - if (getAccumCoder() == null) { - throw new IllegalStateException("Unable to infer a coder for GroupingState and no" - + " Coder was specified. Please set a coder by either invoking" - + " StateSpecs.combining(Coder<AccumT> accumCoder," - + " CombineFn<InputT, AccumT, OutputT> combineFn)" - + " or by registering the coder in the Pipeline's CoderRegistry."); + @Override + public void finishSpecifying() { + if (accumCoder == null) { + throw new IllegalStateException( + "Unable to infer a coder for" + + " CombiningWithContextState and no Coder was specified." + + " Please set a coder by either invoking" + + " StateSpecs.combiningWithcontext(Coder<AccumT> accumCoder," + + " CombineFnWithContext<InputT, AccumT, OutputT> combineFn)" + + " or by registering the coder in the Pipeline's CoderRegistry."); } } @@ -455,12 +379,11 @@ public class StateSpecs { return true; } - if (!(obj instanceof CombiningStateSpec)) { + if (!(obj instanceof CombiningWithContextStateSpec)) { return false; } - KeyedCombiningStateSpec<?, ?, ?, ?> that = - (KeyedCombiningStateSpec<?, ?, ?, ?>) obj; + CombiningWithContextStateSpec<?, ?, ?> that = (CombiningWithContextStateSpec<?, ?, ?>) obj; return Objects.equals(this.accumCoder, that.accumCoder); } http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java index 13c5f16..dcb8fdc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineFnsTest.java @@ -40,7 +40,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.Combine.BinaryCombineFn; import org.apache.beam.sdk.transforms.CombineFns.CoCombineResult; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -79,7 +79,7 @@ public class CombineFnsTest { expectedException.expectMessage("it is already present in the composition"); TupleTag<Integer> tag = new TupleTag<Integer>(); - CombineFns.composeKeyed() + CombineFns.compose() .with(new GetIntegerFunction(), Max.ofIntegers(), tag) .with(new GetIntegerFunction(), Min.ofIntegers(), tag); } @@ -93,23 +93,6 @@ public class CombineFnsTest { CombineFns.compose() .with( new GetUserStringFunction(), - new ConcatStringWithContext(null /* view */).forKey("G", StringUtf8Coder.of()), - tag) - .with( - new GetUserStringFunction(), - new ConcatStringWithContext(null /* view */).forKey("G", StringUtf8Coder.of()), - tag); - } - - @Test - public void testDuplicatedTagsWithContextKeyed() { - expectedException.expect(IllegalArgumentException.class); - expectedException.expectMessage("it is already present in the composition"); - - TupleTag<UserString> tag = new TupleTag<UserString>(); - CombineFns.composeKeyed() - .with( - new GetUserStringFunction(), new ConcatStringWithContext(null /* view */), tag) .with( @@ -153,17 +136,15 @@ public class CombineFnsTest { .apply( "ExtractGloballyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); - PCollection<KV<String, KV<Integer, String>>> combinePerKey = perKeyInput - .apply(Combine.perKey(CombineFns.composeKeyed() - .with( - new GetIntegerFunction(), - Max.ofIntegers().<String>asKeyedFn(), - maxIntTag) - .with( - new GetUserStringFunction(), - new ConcatString().<String>asKeyedFn(), - concatStringTag))) - .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); + PCollection<KV<String, KV<Integer, String>>> combinePerKey = + perKeyInput + .apply( + Combine.<String, KV<Integer, UserString>, CoCombineResult>perKey( + CombineFns.compose() + .with(new GetIntegerFunction(), Max.ofIntegers(), maxIntTag) + .with(new GetUserStringFunction(), new ConcatString(), concatStringTag))) + .apply( + "ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); PAssert.that(combineGlobally).containsInAnyOrder( KV.of("global", KV.of(13, "111134"))); PAssert.that(combinePerKey).containsInAnyOrder( @@ -205,7 +186,7 @@ public class CombineFnsTest { maxIntTag) .with( new GetUserStringFunction(), - new ConcatStringWithContext(view).forKey("G", StringUtf8Coder.of()), + new ConcatStringWithContext(view), concatStringTag)) .withoutDefaults() .withSideInputs(ImmutableList.of(view))) @@ -213,23 +194,24 @@ public class CombineFnsTest { .apply( "ExtractGloballyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); - PCollection<KV<String, KV<Integer, String>>> combinePerKey = perKeyInput - .apply(Combine.perKey(CombineFns.composeKeyed() - .with( - new GetIntegerFunction(), - Max.ofIntegers().<String>asKeyedFn(), - maxIntTag) - .with( - new GetUserStringFunction(), - new ConcatStringWithContext(view), - concatStringTag)) - .withSideInputs(ImmutableList.of(view))) - .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); + PCollection<KV<String, KV<Integer, String>>> combinePerKey = + perKeyInput + .apply( + Combine.<String, KV<Integer, UserString>, CoCombineResult>perKey( + CombineFns.compose() + .with(new GetIntegerFunction(), Max.ofIntegers(), maxIntTag) + .with( + new GetUserStringFunction(), + new ConcatStringWithContext(view), + concatStringTag)) + .withSideInputs(ImmutableList.of(view))) + .apply( + "ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); PAssert.that(combineGlobally).containsInAnyOrder( - KV.of("global", KV.of(13, "111134GI"))); + KV.of("global", KV.of(13, "111134I"))); PAssert.that(combinePerKey).containsInAnyOrder( - KV.of("a", KV.of(4, "114Ia")), - KV.of("b", KV.of(13, "113Ib"))); + KV.of("a", KV.of(4, "114I")), + KV.of("b", KV.of(13, "113I"))); p.run(); } @@ -256,17 +238,16 @@ public class CombineFnsTest { TupleTag<Integer> maxIntTag = new TupleTag<Integer>(); TupleTag<UserString> concatStringTag = new TupleTag<UserString>(); - PCollection<KV<String, KV<Integer, String>>> combinePerKey = perKeyInput - .apply(Combine.perKey(CombineFns.composeKeyed() - .with( - new GetIntegerFunction(), - Max.ofIntegers().<String>asKeyedFn(), - maxIntTag) - .with( - new GetUserStringFunction(), - new OutputNullString().<String>asKeyedFn(), - concatStringTag))) - .apply("ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); + PCollection<KV<String, KV<Integer, String>>> combinePerKey = + perKeyInput + .apply( + Combine.<String, KV<Integer, UserString>, CoCombineResult>perKey( + CombineFns.compose() + .with(new GetIntegerFunction(), Max.ofIntegers(), maxIntTag) + .with( + new GetUserStringFunction(), new OutputNullString(), concatStringTag))) + .apply( + "ExtractPerKeyResult", ParDo.of(new ExtractResultDoFn(maxIntTag, concatStringTag))); PAssert.that(combinePerKey).containsInAnyOrder( KV.of("a", KV.of(4, (String) null)), KV.of("b", KV.of(13, (String) null))); @@ -407,7 +388,7 @@ public class CombineFnsTest { } private static class ConcatStringWithContext - extends KeyedCombineFnWithContext<String, UserString, UserString, UserString> { + extends CombineFnWithContext<UserString, UserString, UserString> { private final PCollectionView<String> view; private ConcatStringWithContext(PCollectionView<String> view) { @@ -415,22 +396,22 @@ public class CombineFnsTest { } @Override - public UserString createAccumulator(String key, CombineWithContext.Context c) { - return UserString.of(key + c.sideInput(view)); + public UserString createAccumulator(CombineWithContext.Context c) { + return UserString.of(c.sideInput(view)); } @Override public UserString addInput( - String key, UserString accumulator, UserString input, CombineWithContext.Context c) { - assertThat(accumulator.strValue, Matchers.startsWith(key + c.sideInput(view))); + UserString accumulator, UserString input, CombineWithContext.Context c) { + assertThat(accumulator.strValue, Matchers.startsWith(c.sideInput(view))); accumulator.strValue += input.strValue; return accumulator; } @Override public UserString mergeAccumulators( - String key, Iterable<UserString> accumulators, CombineWithContext.Context c) { - String keyPrefix = key + c.sideInput(view); + Iterable<UserString> accumulators, CombineWithContext.Context c) { + String keyPrefix = c.sideInput(view); String all = keyPrefix; for (UserString accumulator : accumulators) { assertThat(accumulator.strValue, Matchers.startsWith(keyPrefix)); @@ -441,9 +422,8 @@ public class CombineFnsTest { } @Override - public UserString extractOutput( - String key, UserString accumulator, CombineWithContext.Context c) { - assertThat(accumulator.strValue, Matchers.startsWith(key + c.sideInput(view))); + public UserString extractOutput(UserString accumulator, CombineWithContext.Context c) { + assertThat(accumulator.strValue, Matchers.startsWith(c.sideInput(view))); char[] chars = accumulator.strValue.toCharArray(); Arrays.sort(chars); return UserString.of(new String(chars)); http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index a5f3df2..82c2504 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -18,7 +18,6 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import static org.apache.beam.sdk.TestUtils.checkCombineFn; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; @@ -58,9 +57,10 @@ import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.ValidatesRunner; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.CombineTest.TestCombineFn.Accumulator; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.transforms.windowing.AfterPane; @@ -127,7 +127,7 @@ public class CombineTest implements Serializable { // Java 8 will infer. PCollection<KV<String, String>> sumPerKey = input - .apply(Combine.perKey(new TestKeyedCombineFn())); + .apply(Combine.<String, Integer, String>perKey(new TestCombineFn())); PAssert.that(sum).containsInAnyOrder(globalSum); PAssert.that(sumPerKey).containsInAnyOrder(perKeyCombines); @@ -147,13 +147,13 @@ public class CombineTest implements Serializable { PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton()); // Java 8 will infer. - PCollection<KV<String, String>> combinePerKey = perKeyInput - .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallySumView)) - .withSideInputs(Arrays.asList(globallySumView))); + PCollection<KV<String, String>> combinePerKey = + perKeyInput.apply( + Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView)) + .withSideInputs(Arrays.asList(globallySumView))); PCollection<String> combineGlobally = globallyInput - .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallySumView) - .forKey("G", StringUtf8Coder.of())) + .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) .withoutDefaults() .withSideInputs(Arrays.asList(globallySumView))); @@ -168,7 +168,7 @@ public class CombineTest implements Serializable { @Category(ValidatesRunner.class) @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombine() { - runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114a"), KV.of("b", "113b"))); + runTestSimpleCombine(TABLE, 20, Arrays.asList(KV.of("a", "114"), KV.of("b", "113"))); } @Test @@ -176,8 +176,8 @@ public class CombineTest implements Serializable { @SuppressWarnings({"rawtypes", "unchecked"}) public void testSimpleCombineWithContext() { runTestSimpleCombineWithContext(TABLE, 20, - Arrays.asList(KV.of("a", "01124a"), KV.of("b", "01123b")), - new String[] {"01111234G"}); + Arrays.asList(KV.of("a", "01124"), KV.of("b", "01123")), + new String[] {"01111234"}); } @Test @@ -260,14 +260,14 @@ public class CombineTest implements Serializable { .apply(Combine.globally(new SumInts()).withoutDefaults()); PCollection<KV<String, String>> sumPerKey = input - .apply(Combine.perKey(new TestKeyedCombineFn())); + .apply(Combine.<String, Integer, String>perKey(new TestCombineFn())); PAssert.that(sum).containsInAnyOrder(2, 5, 13); PAssert.that(sumPerKey).containsInAnyOrder( - KV.of("a", "11a"), - KV.of("a", "4a"), - KV.of("b", "1b"), - KV.of("b", "13b")); + KV.of("a", "11"), + KV.of("a", "4"), + KV.of("b", "1"), + KV.of("b", "13")); pipeline.run(); } @@ -286,23 +286,23 @@ public class CombineTest implements Serializable { PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton()); - PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput - .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallySumView)) - .withSideInputs(Arrays.asList(globallySumView))); + PCollection<KV<String, String>> combinePerKeyWithContext = + perKeyInput.apply( + Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView)) + .withSideInputs(Arrays.asList(globallySumView))); PCollection<String> combineGloballyWithContext = globallyInput - .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallySumView) - .forKey("G", StringUtf8Coder.of())) + .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) .withoutDefaults() .withSideInputs(Arrays.asList(globallySumView))); PAssert.that(sum).containsInAnyOrder(2, 5, 13); PAssert.that(combinePerKeyWithContext).containsInAnyOrder( - KV.of("a", "112a"), - KV.of("a", "45a"), - KV.of("b", "15b"), - KV.of("b", "1133b")); - PAssert.that(combineGloballyWithContext).containsInAnyOrder("112G", "145G", "1133G"); + KV.of("a", "112"), + KV.of("a", "45"), + KV.of("b", "15"), + KV.of("b", "1133")); + PAssert.that(combineGloballyWithContext).containsInAnyOrder("112", "145", "1133"); pipeline.run(); } @@ -321,28 +321,28 @@ public class CombineTest implements Serializable { PCollectionView<Integer> globallySumView = sum.apply(View.<Integer>asSingleton()); - PCollection<KV<String, String>> combinePerKeyWithContext = perKeyInput - .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallySumView)) - .withSideInputs(Arrays.asList(globallySumView))); + PCollection<KV<String, String>> combinePerKeyWithContext = + perKeyInput.apply( + Combine.<String, Integer, String>perKey(new TestCombineFnWithContext(globallySumView)) + .withSideInputs(Arrays.asList(globallySumView))); PCollection<String> combineGloballyWithContext = globallyInput - .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallySumView) - .forKey("G", StringUtf8Coder.of())) + .apply(Combine.globally(new TestCombineFnWithContext(globallySumView)) .withoutDefaults() .withSideInputs(Arrays.asList(globallySumView))); PAssert.that(sum).containsInAnyOrder(1, 2, 1, 4, 5, 14, 13); PAssert.that(combinePerKeyWithContext).containsInAnyOrder( - KV.of("a", "11a"), - KV.of("a", "112a"), - KV.of("a", "11a"), - KV.of("a", "44a"), - KV.of("a", "45a"), - KV.of("b", "15b"), - KV.of("b", "11134b"), - KV.of("b", "1133b")); + KV.of("a", "11"), + KV.of("a", "112"), + KV.of("a", "11"), + KV.of("a", "44"), + KV.of("a", "45"), + KV.of("b", "15"), + KV.of("b", "11134"), + KV.of("b", "1133")); PAssert.that(combineGloballyWithContext).containsInAnyOrder( - "11G", "112G", "11G", "44G", "145G", "11134G", "1133G"); + "11", "112", "11", "44", "145", "11134", "1133"); pipeline.run(); } @@ -392,13 +392,13 @@ public class CombineTest implements Serializable { .apply(Combine.globally(new SumInts()).withoutDefaults()); PCollection<KV<String, String>> sumPerKey = input - .apply(Combine.perKey(new TestKeyedCombineFn())); + .apply(Combine.<String, Integer, String>perKey(new TestCombineFn())); PAssert.that(sum).containsInAnyOrder(7, 13); PAssert.that(sumPerKey).containsInAnyOrder( - KV.of("a", "114a"), - KV.of("b", "1b"), - KV.of("b", "13b")); + KV.of("a", "114"), + KV.of("b", "1"), + KV.of("b", "13")); pipeline.run(); } @@ -419,26 +419,29 @@ public class CombineTest implements Serializable { PCollectionView<Integer> globallyFixedWindowsView = fixedWindowsSum.apply(View.<Integer>asSingleton().withDefaultValue(0)); - PCollection<KV<String, String>> sessionsCombinePerKey = perKeyInput - .apply("PerKey Input Sessions", - Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5)))) - .apply(Combine.perKey(new TestKeyedCombineFnWithContext(globallyFixedWindowsView)) - .withSideInputs(Arrays.asList(globallyFixedWindowsView))); + PCollection<KV<String, String>> sessionsCombinePerKey = + perKeyInput + .apply( + "PerKey Input Sessions", + Window.<KV<String, Integer>>into(Sessions.withGapDuration(Duration.millis(5)))) + .apply( + Combine.<String, Integer, String>perKey( + new TestCombineFnWithContext(globallyFixedWindowsView)) + .withSideInputs(Arrays.asList(globallyFixedWindowsView))); PCollection<String> sessionsCombineGlobally = globallyInput .apply("Globally Input Sessions", Window.<Integer>into(Sessions.withGapDuration(Duration.millis(5)))) - .apply(Combine.globally(new TestKeyedCombineFnWithContext(globallyFixedWindowsView) - .forKey("G", StringUtf8Coder.of())) + .apply(Combine.globally(new TestCombineFnWithContext(globallyFixedWindowsView)) .withoutDefaults() .withSideInputs(Arrays.asList(globallyFixedWindowsView))); PAssert.that(fixedWindowsSum).containsInAnyOrder(2, 4, 1, 13); PAssert.that(sessionsCombinePerKey).containsInAnyOrder( - KV.of("a", "1114a"), - KV.of("b", "11b"), - KV.of("b", "013b")); - PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114G", "013G"); + KV.of("a", "1114"), + KV.of("b", "11"), + KV.of("b", "013")); + PAssert.that(sessionsCombineGlobally).containsInAnyOrder("11114", "013"); pipeline.run(); } @@ -502,16 +505,15 @@ public class CombineTest implements Serializable { public void testHotKeyCombining() { PCollection<KV<String, Integer>> input = copy(createInput(pipeline, TABLE), 10); - KeyedCombineFn<String, Integer, ?, Double> mean = - new MeanInts().<String>asKeyedFn(); + CombineFn<Integer, ?, Double> mean = new MeanInts(); PCollection<KV<String, Double>> coldMean = input.apply("ColdMean", - Combine.perKey(mean).withHotKeyFanout(0)); + Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(0)); PCollection<KV<String, Double>> warmMean = input.apply("WarmMean", - Combine.perKey(mean).withHotKeyFanout(hotKeyFanout)); + Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(hotKeyFanout)); PCollection<KV<String, Double>> hotMean = input.apply("HotMean", - Combine.perKey(mean).withHotKeyFanout(5)); + Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(5)); PCollection<KV<String, Double>> splitMean = input.apply("SplitMean", - Combine.perKey(mean).withHotKeyFanout(splitHotKeyFanout)); + Combine.<String, Integer, Double>perKey(mean).withHotKeyFanout(splitHotKeyFanout)); List<KV<String, Double>> expected = Arrays.asList(KV.of("a", 2.0), KV.of("b", 7.0)); PAssert.that(coldMean).containsInAnyOrder(expected); @@ -678,10 +680,10 @@ public class CombineTest implements Serializable { assertEquals( "Combine.GloballyAsSingletonView", Combine.globally(new SumInts()).asSingletonView().getName()); - assertEquals("Combine.perKey(TestKeyed)", Combine.perKey(new TestKeyedCombineFn()).getName()); + assertEquals("Combine.perKey(Test)", Combine.perKey(new TestCombineFn()).getName()); assertEquals( - "Combine.perKeyWithFanout(TestKeyed)", - Combine.perKey(new TestKeyedCombineFn()).withHotKeyFanout(10).getName()); + "Combine.perKeyWithFanout(Test)", + Combine.perKey(new TestCombineFn()).withHotKeyFanout(10).getName()); } @Test @@ -908,14 +910,10 @@ public class CombineTest implements Serializable { } /** - * A KeyedCombineFn that exercises the full generality of [Keyed]CombineFn. - * - * <p>The net result of applying this CombineFn is a sorted list of all - * characters occurring in the key and the decimal representations of - * each value. + * A {@link CombineFn} that results in a sorted list of all characters occurring in the key and + * the decimal representations of each value. */ - public static class TestKeyedCombineFn - extends KeyedCombineFn<String, Integer, TestKeyedCombineFn.Accumulator, String> { + public static class TestCombineFn extends CombineFn<Integer, TestCombineFn.Accumulator, String> { // Not serializable. static class Accumulator { @@ -943,20 +941,18 @@ public class CombineTest implements Serializable { @Override public Coder<Accumulator> getAccumulatorCoder( - CoderRegistry registry, Coder<String> keyCoder, Coder<Integer> inputCoder) { + CoderRegistry registry, Coder<Integer> inputCoder) { return Accumulator.getCoder(); } @Override - public Accumulator createAccumulator(String key) { - return new Accumulator(key); + public Accumulator createAccumulator() { + return new Accumulator(""); } @Override - public Accumulator addInput(String key, Accumulator accumulator, Integer value) { - checkNotNull(key); + public Accumulator addInput(Accumulator accumulator, Integer value) { try { - assertThat(accumulator.value, Matchers.startsWith(key)); return new Accumulator(accumulator.value + String.valueOf(value)); } finally { accumulator.value = "cleared in addInput"; @@ -964,19 +960,17 @@ public class CombineTest implements Serializable { } @Override - public Accumulator mergeAccumulators(String key, Iterable<Accumulator> accumulators) { - String all = key; + public Accumulator mergeAccumulators(Iterable<Accumulator> accumulators) { + String all = ""; for (Accumulator accumulator : accumulators) { - assertThat(accumulator.value, Matchers.startsWith(key)); - all += accumulator.value.substring(key.length()); + all += accumulator.value; accumulator.value = "cleared in mergeAccumulators"; } return new Accumulator(all); } @Override - public String extractOutput(String key, Accumulator accumulator) { - assertThat(accumulator.value, Matchers.startsWith(key)); + public String extractOutput(Accumulator accumulator) { char[] chars = accumulator.value.toCharArray(); Arrays.sort(chars); return new String(chars); @@ -984,38 +978,33 @@ public class CombineTest implements Serializable { } /** - * A {@link KeyedCombineFnWithContext} that exercises the full generality - * of [Keyed]CombineFnWithContext. - * - * <p>The net result of applying this CombineFn is a sorted list of all - * characters occurring in the key and the decimal representations of - * main and side inputs values. + * A {@link CombineFnWithContext} that produces a sorted list of all characters occurring in the + * key and the decimal representations of main and side inputs values. */ - public class TestKeyedCombineFnWithContext - extends KeyedCombineFnWithContext<String, Integer, TestKeyedCombineFn.Accumulator, String> { + public class TestCombineFnWithContext extends CombineFnWithContext<Integer, Accumulator, String> { private final PCollectionView<Integer> view; - public TestKeyedCombineFnWithContext(PCollectionView<Integer> view) { + public TestCombineFnWithContext(PCollectionView<Integer> view) { this.view = view; } @Override - public Coder<TestKeyedCombineFn.Accumulator> getAccumulatorCoder( - CoderRegistry registry, Coder<String> keyCoder, Coder<Integer> inputCoder) { - return TestKeyedCombineFn.Accumulator.getCoder(); + public Coder<TestCombineFn.Accumulator> getAccumulatorCoder( + CoderRegistry registry, Coder<Integer> inputCoder) { + return TestCombineFn.Accumulator.getCoder(); } @Override - public TestKeyedCombineFn.Accumulator createAccumulator(String key, Context c) { - return new TestKeyedCombineFn.Accumulator(key + c.sideInput(view).toString()); + public TestCombineFn.Accumulator createAccumulator(Context c) { + return new TestCombineFn.Accumulator(c.sideInput(view).toString()); } @Override - public TestKeyedCombineFn.Accumulator addInput( - String key, TestKeyedCombineFn.Accumulator accumulator, Integer value, Context c) { + public TestCombineFn.Accumulator addInput( + TestCombineFn.Accumulator accumulator, Integer value, Context c) { try { - assertThat(accumulator.value, Matchers.startsWith(key + c.sideInput(view).toString())); - return new TestKeyedCombineFn.Accumulator(accumulator.value + String.valueOf(value)); + assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString())); + return new TestCombineFn.Accumulator(accumulator.value + String.valueOf(value)); } finally { accumulator.value = "cleared in addInput"; } @@ -1023,21 +1012,21 @@ public class CombineTest implements Serializable { } @Override - public TestKeyedCombineFn.Accumulator mergeAccumulators( - String key, Iterable<TestKeyedCombineFn.Accumulator> accumulators, Context c) { - String keyPrefix = key + c.sideInput(view).toString(); - String all = keyPrefix; - for (TestKeyedCombineFn.Accumulator accumulator : accumulators) { - assertThat(accumulator.value, Matchers.startsWith(keyPrefix)); - all += accumulator.value.substring(keyPrefix.length()); + public TestCombineFn.Accumulator mergeAccumulators( + Iterable<TestCombineFn.Accumulator> accumulators, Context c) { + String prefix = c.sideInput(view).toString(); + String all = prefix; + for (TestCombineFn.Accumulator accumulator : accumulators) { + assertThat(accumulator.value, Matchers.startsWith(prefix)); + all += accumulator.value.substring(prefix.length()); accumulator.value = "cleared in mergeAccumulators"; } - return new TestKeyedCombineFn.Accumulator(all); + return new TestCombineFn.Accumulator(all); } @Override - public String extractOutput(String key, TestKeyedCombineFn.Accumulator accumulator, Context c) { - assertThat(accumulator.value, Matchers.startsWith(key + c.sideInput(view).toString())); + public String extractOutput(TestCombineFn.Accumulator accumulator, Context c) { + assertThat(accumulator.value, Matchers.startsWith(c.sideInput(view).toString())); char[] chars = accumulator.value.toCharArray(); Arrays.sort(chars); return new String(chars); http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 1a976f2..52b2f5e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -2501,7 +2501,7 @@ public class ParDoTest implements Serializable { }; thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to infer a coder for GroupingState and no Coder was specified."); + thrown.expectMessage("Unable to infer a coder for CombiningState and no Coder was specified."); pipeline .apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7))) http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java index 867fe0a..b3fa2c6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ViewTest.java @@ -1122,7 +1122,7 @@ public class ViewTest implements Serializable { final PCollectionView<Map<String, Integer>> view = pipeline.apply("CreateSideInput", Create.of(KV.of("a", 1), KV.of("a", 20), KV.of("b", 3))) - .apply("SumIntegers", Combine.perKey(Sum.ofIntegers().<String>asKeyedFn())) + .apply("SumIntegers", Combine.<String, Integer, Integer>perKey(Sum.ofIntegers())) .apply(View.<String, Integer>asMap()); PCollection<KV<String, Integer>> output = http://git-wip-us.apache.org/repos/asf/beam/blob/7e04924e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java index 36a90e9..798e8dc 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/CombineFnUtilTest.java @@ -29,7 +29,6 @@ import java.io.ObjectOutputStream; import java.util.List; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.util.state.StateContexts; import org.junit.Before; @@ -48,12 +47,12 @@ public class CombineFnUtilTest { @Rule public ExpectedException expectedException = ExpectedException.none(); - KeyedCombineFnWithContext<Integer, Integer, Integer, Integer> mockCombineFn; + CombineFnWithContext<Integer, Integer, Integer> mockCombineFn; @SuppressWarnings("unchecked") @Before public void setUp() { - mockCombineFn = mock(KeyedCombineFnWithContext.class, withSettings().serializable()); + mockCombineFn = mock(CombineFnWithContext.class, withSettings().serializable()); } @Test @@ -72,10 +71,6 @@ public class CombineFnUtilTest { CombineFnWithContext<Integer, int[], Integer> fnWithContext = CombineFnUtil.toFnWithContext(Sum.ofIntegers()); assertTrue(fnWithContext == CombineFnUtil.toFnWithContext(fnWithContext)); - - KeyedCombineFnWithContext<Object, Integer, int[], Integer> keyedFnWithContext = - CombineFnUtil.toFnWithContext(Sum.ofIntegers().asKeyedFn()); - assertTrue(keyedFnWithContext == CombineFnUtil.toFnWithContext(keyedFnWithContext)); } @Test @@ -89,14 +84,5 @@ public class CombineFnUtilTest { accum = fnWithContext.addInput(accum, i, nullContext); } assertEquals(10, fnWithContext.extractOutput(accum, nullContext).intValue()); - - KeyedCombineFnWithContext<String, Integer, int[], Integer> keyedFnWithContext = - CombineFnUtil.toFnWithContext(Sum.ofIntegers().<String>asKeyedFn()); - String key = "key"; - accum = keyedFnWithContext.createAccumulator(key, nullContext); - for (Integer i : inputs) { - accum = keyedFnWithContext.addInput(key, accum, i, nullContext); - } - assertEquals(10, keyedFnWithContext.extractOutput(key, accum, nullContext).intValue()); } }
