http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index 095ca2a..26f1c98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -29,8 +29,8 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; import org.apache.beam.sdk.util.state.ValueState; @@ -115,7 +115,7 @@ public class GroupIntoBatches<K, InputT> private final StateSpec<Object, BagState<InputT>> batchSpec; @StateId(NUM_ELEMENTS_IN_BATCH_ID) - private final StateSpec<Object, AccumulatorCombiningState<Long, Long, Long>> + private final StateSpec<Object, CombiningState<Long, Long, Long>> numElementsInBatchSpec; @StateId(KEY_ID) @@ -171,7 +171,7 @@ public class GroupIntoBatches<K, InputT> @TimerId(END_OF_WINDOW_ID) Timer timer, @StateId(BATCH_ID) BagState<InputT> batch, @StateId(NUM_ELEMENTS_IN_BATCH_ID) - AccumulatorCombiningState<Long, Long, Long> numElementsInBatch, + CombiningState<Long, Long, Long> numElementsInBatch, @StateId(KEY_ID) ValueState<K> key, ProcessContext c, BoundedWindow window) { @@ -203,7 +203,7 @@ public class GroupIntoBatches<K, InputT> @StateId(KEY_ID) ValueState<K> key, @StateId(BATCH_ID) BagState<InputT> batch, @StateId(NUM_ELEMENTS_IN_BATCH_ID) - AccumulatorCombiningState<Long, Long, Long> numElementsInBatch, + CombiningState<Long, Long, Long> numElementsInBatch, BoundedWindow window) { LOGGER.debug( "*** END OF WINDOW *** for timer timestamp {} in windows {}", @@ -215,7 +215,7 @@ public class GroupIntoBatches<K, InputT> Context c, ValueState<K> key, BagState<InputT> batch, - AccumulatorCombiningState<Long, Long, Long> numElementsInBatch) { + CombiningState<Long, Long, Long> numElementsInBatch) { Iterable<InputT> values = batch.read(); // when the timer fires, batch state might be empty if (Iterables.size(values) > 0) {
http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java deleted file mode 100644 index 6b120f9..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * State for a single value that is managed by a {@link CombineFn}. This is an internal extension - * to {@link GroupingState} that includes the {@code AccumT} type. - * - * @param <InputT> the type of values added to the state - * @param <AccumT> the type of accumulator - * @param <OutputT> the type of value extracted from the state - */ -public interface AccumulatorCombiningState<InputT, AccumT, OutputT> - extends GroupingState<InputT, OutputT> { - - /** - * Read the merged accumulator for this combining value. It is implied that reading the - * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for - * this. - */ - AccumT getAccum(); - - /** - * Add an accumulator to this combining value. Depending on implementation this may immediately - * merge it with the previous accumulator, or may buffer this accumulator for a future merge. - */ - void addAccum(AccumT accum); - - /** - * Merge the given accumulators according to the underlying combiner. - */ - AccumT mergeAccumulators(Iterable<AccumT> accumulators); - - @Override - AccumulatorCombiningState<InputT, AccumT, OutputT> readLater(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java new file mode 100644 index 0000000..80e4dc9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * State for a single value that is managed by a {@link CombineFn}. This is an internal extension + * to {@link GroupingState} that includes the {@code AccumT} type. + * + * @param <InputT> the type of values added to the state + * @param <AccumT> the type of accumulator + * @param <OutputT> the type of value extracted from the state + */ +public interface CombiningState<InputT, AccumT, OutputT> + extends GroupingState<InputT, OutputT> { + + /** + * Read the merged accumulator for this combining value. It is implied that reading the + * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for + * this. + */ + AccumT getAccum(); + + /** + * Add an accumulator to this combining value. Depending on implementation this may immediately + * merge it with the previous accumulator, or may buffer this accumulator for a future merge. + */ + void addAccum(AccumT accum); + + /** + * Merge the given accumulators according to the underlying combiner. + */ + AccumT mergeAccumulators(Iterable<AccumT> accumulators); + + @Override + CombiningState<InputT, AccumT, OutputT> readLater(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java index fbfb475..98f7238 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java @@ -39,23 +39,23 @@ public interface StateBinder<K> { String id, StateSpec<? super K, MapState<KeyT, ValueT>> spec, Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder); - <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( + <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningValue( String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, Combine.CombineFn<InputT, AccumT, OutputT> combineFn); <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> spec, Coder<AccumT> accumCoder, CombineWithContext.KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn); http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index db4b7de..974e11d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -62,7 +62,7 @@ public class StateSpecs { * {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue( + StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue( CombineFn<InputT, AccumT, OutputT> combineFn) { return new CombiningValueStateSpec<InputT, AccumT, OutputT>(null, combineFn); } @@ -72,7 +72,7 @@ public class StateSpecs { * {@code InputT}s into a single {@code OutputT}. */ public static <InputT, AccumT, OutputT> - StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValue( + StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValue( Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { checkArgument(accumCoder != null, "accumCoder should not be null. " @@ -85,7 +85,7 @@ public class StateSpecs { * multiple {@code InputT}s into a single {@code OutputT}. */ public static <K, InputT, AccumT, OutputT> - StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue( + StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue( KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(null, combineFn); } @@ -95,7 +95,7 @@ public class StateSpecs { * multiple {@code InputT}s into a single {@code OutputT}. */ public static <K, InputT, AccumT, OutputT> - StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValue( + StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValue( Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { checkArgument(accumCoder != null, "accumCoder should not be null. " @@ -108,7 +108,7 @@ public class StateSpecs { * merge multiple {@code InputT}s into a single {@code OutputT}. */ public static <K, InputT, AccumT, OutputT> - StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> + StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueWithContext(KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { return new KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT>(null, combineFn); } @@ -118,7 +118,7 @@ public class StateSpecs { * merge multiple {@code InputT}s into a single {@code OutputT}. */ public static <K, InputT, AccumT, OutputT> - StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> + StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueWithContext( Coder<AccumT> accumCoder, KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { @@ -137,7 +137,7 @@ public class StateSpecs { * only be used to initialize static values. */ public static <InputT, AccumT, OutputT> - StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> + StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueFromInputInternal( Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { try { @@ -154,13 +154,13 @@ public class StateSpecs { } private static <InputT, AccumT, OutputT> - StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningValueInternal( + StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> combiningValueInternal( Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { return new CombiningValueStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); } private static <K, InputT, AccumT, OutputT> - StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal( + StateSpec<K, CombiningState<InputT, AccumT, OutputT>> keyedCombiningValueInternal( Coder<AccumT> accumCoder, KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { return new KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT>(accumCoder, combineFn); } @@ -219,7 +219,7 @@ public class StateSpecs { public static <K, InputT, AccumT, OutputT> StateSpec<Object, BagState<AccumT>> convertToBagSpecInternal( - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningSpec) { + StateSpec<? super K, CombiningState<InputT, AccumT, OutputT>> combiningSpec) { if (combiningSpec instanceof KeyedCombiningValueStateSpec) { // Checked above; conversion to a bag spec depends on the provided spec being one of those // created via the factory methods in this class. @@ -302,7 +302,7 @@ public class StateSpecs { */ private static class CombiningValueStateSpec<InputT, AccumT, OutputT> extends KeyedCombiningValueStateSpec<Object, InputT, AccumT, OutputT> - implements StateSpec<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> { + implements StateSpec<Object, CombiningState<InputT, AccumT, OutputT>> { @Nullable private Coder<AccumT> accumCoder; @@ -339,7 +339,7 @@ public class StateSpecs { * <p>Includes the {@link KeyedCombineFnWithContext} and the coder for the accumulator type. */ private static class KeyedCombiningValueWithContextStateSpec<K, InputT, AccumT, OutputT> - implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> { + implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> { @Nullable private Coder<AccumT> accumCoder; @@ -353,7 +353,7 @@ public class StateSpecs { } @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> bind( + public CombiningState<InputT, AccumT, OutputT> bind( String id, StateBinder<? extends K> visitor) { return visitor.bindKeyedCombiningValueWithContext(id, this, accumCoder, combineFn); } @@ -410,7 +410,7 @@ public class StateSpecs { * <p>Includes the {@link KeyedCombineFn} and the coder for the accumulator type. */ private static class KeyedCombiningValueStateSpec<K, InputT, AccumT, OutputT> - implements StateSpec<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> { + implements StateSpec<K, CombiningState<InputT, AccumT, OutputT>> { @Nullable private Coder<AccumT> accumCoder; @@ -428,7 +428,7 @@ public class StateSpecs { } @Override - public AccumulatorCombiningState<InputT, AccumT, OutputT> bind( + public CombiningState<InputT, AccumT, OutputT> bind( String id, StateBinder<? extends K> visitor) { return visitor.bindKeyedCombiningValue(id, this, getAccumCoder(), keyedCombineFn); } http://git-wip-us.apache.org/repos/asf/beam/blob/ef480a37/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index cc67ac2..d9b7b54 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -77,6 +77,7 @@ import org.apache.beam.sdk.testing.UsesTimersInParDo; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.DoFn.OnTimer; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; +import org.apache.beam.sdk.transforms.Mean.CountSum; import org.apache.beam.sdk.transforms.ParDo.SingleOutput; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -92,8 +93,8 @@ import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; import org.apache.beam.sdk.util.common.ElementByteSizeObserver; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.SetState; import org.apache.beam.sdk.util.state.StateSpec; @@ -2085,7 +2086,7 @@ public class ParDoTest implements Serializable { private final StateSpec<Object, SetState<Integer>> setState = StateSpecs.set(VarIntCoder.of()); @StateId(countStateId) - private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private final StateSpec<Object, CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @@ -2093,7 +2094,7 @@ public class ParDoTest implements Serializable { public void processElement( ProcessContext c, @StateId(stateId) SetState<Integer> state, - @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> + @StateId(countStateId) CombiningState<Integer, int[], Integer> count) { state.add(c.element().getValue()); count.add(1); @@ -2129,7 +2130,7 @@ public class ParDoTest implements Serializable { private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set(); @StateId(countStateId) - private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private final StateSpec<Object, CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @@ -2137,7 +2138,7 @@ public class ParDoTest implements Serializable { public void processElement( ProcessContext c, @StateId(stateId) SetState<MyInteger> state, - @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) { + @StateId(countStateId) CombiningState<Integer, int[], Integer> count) { state.add(new MyInteger(c.element().getValue())); count.add(1); if (count.read() >= 4) { @@ -2172,7 +2173,7 @@ public class ParDoTest implements Serializable { private final StateSpec<Object, SetState<MyInteger>> setState = StateSpecs.set(); @StateId(countStateId) - private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private final StateSpec<Object, CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @@ -2180,7 +2181,7 @@ public class ParDoTest implements Serializable { public void processElement( ProcessContext c, @StateId(stateId) SetState<MyInteger> state, - @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> count) { + @StateId(countStateId) CombiningState<Integer, int[], Integer> count) { state.add(new MyInteger(c.element().getValue())); count.add(1); if (count.read() >= 4) { @@ -2214,14 +2215,14 @@ public class ParDoTest implements Serializable { private final StateSpec<Object, MapState<String, Integer>> mapState = StateSpecs.map(StringUtf8Coder.of(), VarIntCoder.of()); @StateId(countStateId) - private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private final StateSpec<Object, CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @ProcessElement public void processElement( ProcessContext c, @StateId(stateId) MapState<String, Integer> state, - @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> + @StateId(countStateId) CombiningState<Integer, int[], Integer> count) { KV<String, Integer> value = c.element().getValue(); state.put(value.getKey(), value.getValue()); @@ -2260,14 +2261,14 @@ public class ParDoTest implements Serializable { @StateId(stateId) private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map(); @StateId(countStateId) - private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private final StateSpec<Object, CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @ProcessElement public void processElement( ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state, - @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> + @StateId(countStateId) CombiningState<Integer, int[], Integer> count) { KV<String, Integer> value = c.element().getValue(); state.put(value.getKey(), new MyInteger(value.getValue())); @@ -2306,14 +2307,14 @@ public class ParDoTest implements Serializable { @StateId(stateId) private final StateSpec<Object, MapState<String, MyInteger>> mapState = StateSpecs.map(); @StateId(countStateId) - private final StateSpec<Object, AccumulatorCombiningState<Integer, int[], Integer>> + private final StateSpec<Object, CombiningState<Integer, int[], Integer>> countState = StateSpecs.combiningValueFromInputInternal(VarIntCoder.of(), Sum.ofIntegers()); @ProcessElement public void processElement( ProcessContext c, @StateId(stateId) MapState<String, MyInteger> state, - @StateId(countStateId) AccumulatorCombiningState<Integer, int[], Integer> + @StateId(countStateId) CombiningState<Integer, int[], Integer> count) { KV<String, Integer> value = c.element().getValue(); state.put(value.getKey(), new MyInteger(value.getValue())); @@ -2351,7 +2352,7 @@ public class ParDoTest implements Serializable { @StateId(stateId) private final StateSpec< - Object, AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double>> + Object, CombiningState<Double, CountSum<Double>, Double>> combiningState = StateSpecs.combiningValue(new Mean.CountSumCoder<Double>(), Mean.<Double>of()); @@ -2359,7 +2360,7 @@ public class ParDoTest implements Serializable { public void processElement( ProcessContext c, @StateId(stateId) - AccumulatorCombiningState<Double, Mean.CountSum<Double>, Double> state) { + CombiningState<Double, CountSum<Double>, Double> state) { state.add(c.element().getValue()); Double currentValue = state.read(); if (Math.abs(currentValue - 0.5) < EPSILON) { @@ -2391,7 +2392,7 @@ public class ParDoTest implements Serializable { @StateId(stateId) private final StateSpec< - Object, AccumulatorCombiningState<Integer, MyInteger, Integer>> + Object, CombiningState<Integer, MyInteger, Integer>> combiningState = StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() { @Override @@ -2423,7 +2424,7 @@ public class ParDoTest implements Serializable { public void processElement( ProcessContext c, @StateId(stateId) - AccumulatorCombiningState<Integer, MyInteger, Integer> state) { + CombiningState<Integer, MyInteger, Integer> state) { state.add(c.element().getValue()); Integer currentValue = state.read(); if (currentValue == EXPECTED_SUM) { @@ -2453,7 +2454,7 @@ public class ParDoTest implements Serializable { @StateId(stateId) private final StateSpec< - Object, AccumulatorCombiningState<Integer, MyInteger, Integer>> + Object, CombiningState<Integer, MyInteger, Integer>> combiningState = StateSpecs.combiningValue(new Combine.CombineFn<Integer, MyInteger, Integer>() { @Override @@ -2485,7 +2486,7 @@ public class ParDoTest implements Serializable { public void processElement( ProcessContext c, @StateId(stateId) - AccumulatorCombiningState<Integer, MyInteger, Integer> state) { + CombiningState<Integer, MyInteger, Integer> state) { state.add(c.element().getValue()); Integer currentValue = state.read(); if (currentValue == EXPECTED_SUM) {