Rename combiningValue to combining in StateSpecs
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/33259d05 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/33259d05 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/33259d05 Branch: refs/heads/master Commit: 33259d05e7ddfca7d27974394ea9e94b2b4b985c Parents: ef480a3 Author: Kenneth Knowles <k...@google.com> Authored: Mon Apr 3 11:37:50 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Apr 6 11:57:21 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/runners/core/StateTags.java | 14 +-- .../src/main/resources/beam/findbugs-filter.xml | 2 +- .../beam/sdk/transforms/GroupIntoBatches.java | 2 +- .../apache/beam/sdk/util/state/StateBinder.java | 6 +- .../apache/beam/sdk/util/state/StateSpecs.java | 90 ++++++++++---------- .../apache/beam/sdk/transforms/ParDoTest.java | 18 ++-- 6 files changed, 66 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java index 4893919..77ae8f5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -84,7 +84,7 @@ public class StateTags { @Override public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindCombiningValue( + CombiningState<InputT, AccumT, OutputT> bindCombining( String id, StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, @@ -94,7 +94,7 @@ public class StateTags { @Override public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + CombiningState<InputT, AccumT, OutputT> bindKeyedCombining( String id, StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, @@ -104,7 +104,7 @@ public class StateTags { @Override public <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext( String id, StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, @@ -162,7 +162,7 @@ public class StateTags { combiningValue( String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn)); + new StructuredId(id), StateSpecs.combining(accumCoder, combineFn)); } /** @@ -174,7 +174,7 @@ public class StateTags { keyedCombiningValue(String id, Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn)); + new StructuredId(id), StateSpecs.keyedCombining(accumCoder, combineFn)); } /** @@ -188,7 +188,7 @@ public class StateTags { Coder<AccumT> accumCoder, KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn)); + new StructuredId(id), StateSpecs.keyedCombiningWithContext(accumCoder, combineFn)); } /** @@ -203,7 +203,7 @@ public class StateTags { combiningValueFromInputInternal( String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn)); + new StructuredId(id), StateSpecs.combiningFromInputInternal(inputCoder, combineFn)); } /** http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 2799b00..1c0f301 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -380,7 +380,7 @@ <!--[BEAM-420] Non-transient non-serializable instance field in serializable class--> </Match> <Match> - <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningValueStateSpec"/> + <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningStateSpec"/> <Method name="equals"/> <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/> <!--[BEAM-421] Class doesn't override equals in superclass--> http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index 26f1c98..2462b1c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -132,7 +132,7 @@ public class GroupIntoBatches<K, InputT> this.allowedLateness = allowedLateness; this.batchSpec = StateSpecs.bag(inputValueCoder); this.numElementsInBatchSpec = - StateSpecs.combiningValue( + StateSpecs.combining( VarLongCoder.of(), new Combine.CombineFn<Long, Long, Long>() { http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java index 98f7238..64841fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java @@ -39,21 +39,21 @@ public interface StateBinder<K> { String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder); - <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( + <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining( String id, StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn); <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + CombiningState<InputT, AccumT, OutputT> bindKeyedCombining( String id, StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); <InputT, AccumT, OutputT> - CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningWithContext( String id, StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index 974e11d..30a7a6d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -62,9 +62,9 @@ public class StateSpecs { * {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue( + StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining( CombineFn<InputT, AccumT, OutputT> combineFn) { - return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn); + return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn); } /** @@ -72,12 +72,12 @@ public class StateSpecs { * {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue( + StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combining( Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { checkArgument(accumCoder != null, "accumCoder should not be null. " - + "Consider using combiningValue(CombineFn<> combineFn) instead."); - return combiningValueInternal(accumCoder, combineFn); + + "Consider using combining(CombineFn<> combineFn) instead."); + return combiningInternal(accumCoder, combineFn); } /** @@ -85,9 +85,9 @@ public class StateSpecs { * multiple {@code InputT}s into a single {@code OutputT}. */ public static <K, InputT, AccumT, OutputT> - StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue( + StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining( KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn); + return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(null, combineFn); } /** @@ -95,12 +95,12 @@ public class StateSpecs { * multiple {@code InputT}s into a single {@code OutputT}. */ public static <K, InputT, AccumT, OutputT> - StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue( + StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombining( Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { checkArgument(accumCoder != null, "accumCoder should not be null. " - + "Consider using keyedCombiningValue(KeyedCombineFn<> combineFn) instead."); - return keyedCombiningValueInternal(accumCoder, combineFn); + + "Consider using keyedCombining(KeyedCombineFn<> combineFn) instead."); + return keyedCombiningInternal(accumCoder, combineFn); } /** @@ -109,8 +109,8 @@ public class StateSpecs { */ public static <K, InputT, AccumT, OutputT> StateSpec<K, CombiningState<InputT, AccumT, OutputT>> - keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { - return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn); + keyedCombiningWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { + return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn); } /** @@ -119,13 +119,13 @@ public class StateSpecs { */ public static <K, InputT, AccumT, OutputT> StateSpec<K, CombiningState<InputT, AccumT, OutputT>> - keyedCombiningValueWithContext( + keyedCombiningWithContext( Coder<AccumT> accumCoder, KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { checkArgument(accumCoder != null, "accumCoder should not be null. Consider using " - + "keyedCombiningValueWithContext(KeyedCombineFnWithContext<> combineFn) instead."); - return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>( + + "keyedCombiningWithContext(KeyedCombineFnWithContext<> combineFn) instead."); + return new KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>( accumCoder, combineFn); } @@ -138,11 +138,11 @@ public class StateSpecs { */ public static <InputT, AccumT, OutputT> StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> - combiningValueFromInputInternal( + combiningFromInputInternal( Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { try { Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder); - return combiningValueInternal(accumCoder, combineFn); + return combiningInternal(accumCoder, combineFn); } catch (CannotProvideCoderException e) { throw new IllegalArgumentException( "Unable to determine accumulator coder for " @@ -154,15 +154,15 @@ public class StateSpecs { } private static <InputT, AccumT, OutputT> - StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal( + StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningInternal( Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); + return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); } private static <K, InputT, AccumT, OutputT> - StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal( + StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningInternal( Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn); + return new KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn); } /** @@ -220,17 +220,17 @@ public class StateSpecs { public static <K, InputT, AccumT, OutputT> StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal( StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) { - if (combiningSpec instanceof KeyedCombiningValueStateSpec) { + if (combiningSpec instanceof KeyedCombiningStateSpec) { // Checked above; conversion to a bag spec depends on the provided spec being one of those // created via the factory methods in this class. @SuppressWarnings("unchecked") - KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> typedSpec = - (KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>) combiningSpec; + KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> typedSpec = + (KeyedCombiningStateSpec<K, InputT, AccumT, OutputT>) combiningSpec; return typedSpec.asBagSpec(); - } else if (combiningSpec instanceof KeyedCombiningValueWithContextStateSpec) { + } else if (combiningSpec instanceof KeyedCombiningWithContextStateSpec) { @SuppressWarnings("unchecked") - KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec = - (KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec; + KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> typedSpec = + (KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT>) combiningSpec; return typedSpec.asBagSpec(); } else { throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec); @@ -300,15 +300,15 @@ public class StateSpecs { * * <p>Includes the {@link CombineFn} and the coder for the accumulator type. */ - private static class CombiningValueStateSpec<InputT, AccumT, OutputT> - extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT> + private static class CombiningStateSpec<InputT, AccumT, OutputT> + extends KeyedCombiningStateSpec<Object, InputT, AccumT, OutputT> implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> { @Nullable private Coder<AccumT> accumCoder; private final CombineFn<InputT, AccumT, OutputT> combineFn; - private CombiningValueStateSpec( + private CombiningStateSpec( @Nullable Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { super(accumCoder, combineFn.asKeyedFn()); @@ -338,14 +338,14 @@ public class StateSpecs { * * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type. */ - private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> + private static class KeyedCombiningWithContextStateSpec<K, InputT, AccumT, OutputT> implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> { @Nullable private Coder<AccumT> accumCoder; private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn; - protected KeyedCombiningValueWithContextStateSpec( + protected KeyedCombiningWithContextStateSpec( @Nullable Coder<AccumT> accumCoder, KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { this.combineFn = combineFn; @@ -355,7 +355,7 @@ public class StateSpecs { @Override public CombiningState<InputT, AccumT, OutputT> bind( String id, StateBinder<? extends K> visitor) { - return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn); + return visitor.bindKeyedCombiningWithContext(id, this, accumCoder, combineFn); } @SuppressWarnings("unchecked") @@ -371,9 +371,9 @@ public class StateSpecs { @Override public void finishSpecifying() { if (accumCoder == null) { throw new IllegalStateException("Unable to infer a coder for" - + " KeyedCombiningValueWithContextState and no Coder was specified." + + " KeyedCombiningWithContextState and no Coder was specified." + " Please set a coder by either invoking" - + " StateSpecs.keyedCombiningValue(Coder<AccumT> accumCoder," + + " StateSpecs.keyedCombining(Coder<AccumT> accumCoder," + " KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn)" + " or by registering the coder in the Pipeline's CoderRegistry."); } @@ -385,12 +385,12 @@ public class StateSpecs { return true; } - if (!(obj instanceof KeyedCombiningValueWithContextStateSpec)) { + if (!(obj instanceof KeyedCombiningWithContextStateSpec)) { return false; } - KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?> that = - (KeyedCombiningValueWithContextStateSpec<?, ?, ?, ?>) obj; + KeyedCombiningWithContextStateSpec<?, ?, ?, ?> that = + (KeyedCombiningWithContextStateSpec<?, ?, ?, ?>) obj; return Objects.equals(this.accumCoder, that.accumCoder); } @@ -409,14 +409,14 @@ public class StateSpecs { * * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type. */ - private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> + private static class KeyedCombiningStateSpec<K, InputT, AccumT, OutputT> implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> { @Nullable private Coder<AccumT> accumCoder; private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; - protected KeyedCombiningValueStateSpec( + protected KeyedCombiningStateSpec( @Nullable Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { this.keyedCombineFn = keyedCombineFn; @@ -430,7 +430,7 @@ public class StateSpecs { @Override public CombiningState<InputT, AccumT, OutputT> bind( String id, StateBinder<? extends K> visitor) { - return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn); + return visitor.bindKeyedCombining(id, this, getAccumCoder(), keyedCombineFn); } @SuppressWarnings("unchecked") @@ -447,7 +447,7 @@ public class StateSpecs { if (getAccumCoder() == null) { throw new IllegalStateException("Unable to infer a coder for GroupingState and no" + " Coder was specified. Please set a coder by either invoking" - + " StateSpecs.combiningValue(Coder<AccumT> accumCoder," + + " StateSpecs.combining(Coder<AccumT> accumCoder," + " CombineFn<InputT, AccumT, OutputT> combineFn)" + " or by registering the coder in the Pipeline's CoderRegistry."); } @@ -459,12 +459,12 @@ public class StateSpecs { return true; } - if (!(obj instanceof CombiningValueStateSpec)) { + if (!(obj instanceof CombiningStateSpec)) { return false; } - KeyedCombiningValueStateSpec<?, ?, ?, ?> that = - (KeyedCombiningValueStateSpec<?, ?, ?, ?>) obj; + KeyedCombiningStateSpec<?, ?, ?, ?> that = + (KeyedCombiningStateSpec<?, ?, ?, ?>) obj; return Objects.equals(this.accumCoder, that.accumCoder); } http://git-wip-us.apache.org/repos/asf/beam/blob/33259d05/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index d9b7b54..e305da1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -2087,7 +2087,7 @@ public class ParDoTest implements Serializable { StateSpecs.set(VarIntCoder.of()); @StateId(countStateId) private final StateSpec<Object, CombiningState<Integer, int[], Integer>> - countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), + countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @ProcessElement @@ -2131,7 +2131,7 @@ public class ParDoTest implements Serializable { @StateId(countStateId) private final StateSpec<Object, CombiningState<Integer, int[], Integer>> - countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), + countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @ProcessElement @@ -2174,7 +2174,7 @@ public class ParDoTest implements Serializable { @StateId(countStateId) private final StateSpec<Object, CombiningState<Integer, int[], Integer>> - countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), + countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @ProcessElement @@ -2216,7 +2216,7 @@ public class ParDoTest implements Serializable { StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of()); @StateId(countStateId) private final StateSpec<Object, CombiningState<Integer, int[], Integer>> - countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), + countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @ProcessElement @@ -2262,7 +2262,7 @@ public class ParDoTest implements Serializable { private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map(); @StateId(countStateId) private final StateSpec<Object, CombiningState<Integer, int[], Integer>> - countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), + countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @ProcessElement @@ -2308,7 +2308,7 @@ public class ParDoTest implements Serializable { private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map(); @StateId(countStateId) private final StateSpec<Object, CombiningState<Integer, int[], Integer>> - countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), + countState = StateSpecs.combiningFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @ProcessElement @@ -2354,7 +2354,7 @@ public class ParDoTest implements Serializable { private final StateSpec< Object, CombiningState<Double, CountSum<Double>, Double>> combiningState = - StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of()); + StateSpecs.combining(new Mean.CountSumCoder<Double>(), Mean.<Double>of()); @ProcessElement public void processElement( @@ -2394,7 +2394,7 @@ public class ParDoTest implements Serializable { private final StateSpec< Object, CombiningState<Integer, MyInteger, Integer>> combiningState = - StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() { + StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() { @Override public MyInteger createAccumulator() { return new MyInteger(0); @@ -2456,7 +2456,7 @@ public class ParDoTest implements Serializable { private final StateSpec< Object, CombiningState<Integer, MyInteger, Integer>> combiningState = - StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() { + StateSpecs.combining(new Combine.CombineFn<Integer, MyInteger, Integer>() { @Override public MyInteger createAccumulator() { return new MyInteger(0);