Rename CombiningState to GroupingState
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/24c0495a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/24c0495a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/24c0495a Branch: refs/heads/master Commit: 24c0495a22dec9b7c44942794831b284f8caf78c Parents: 0a17645 Author: Kenneth Knowles <k...@google.com> Authored: Mon Apr 3 11:26:30 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu Apr 6 11:57:21 2017 -0700 ---------------------------------------------------------------------- .../utils/ApexStateInternalsTest.java | 6 +-- .../apache/beam/runners/core/StateMerging.java | 4 +- .../beam/runners/core/SystemReduceFn.java | 6 +-- .../AfterDelayFromFirstElementStateMachine.java | 4 +- .../core/InMemoryStateInternalsTest.java | 6 +-- .../CopyOnAccessInMemoryStateInternalsTest.java | 14 +++---- .../FlinkBroadcastStateInternalsTest.java | 6 +-- .../streaming/FlinkStateInternalsTest.java | 6 +-- .../util/state/AccumulatorCombiningState.java | 4 +- .../apache/beam/sdk/util/state/BagState.java | 2 +- .../beam/sdk/util/state/CombiningState.java | 42 -------------------- .../beam/sdk/util/state/GroupingState.java | 42 ++++++++++++++++++++ .../apache/beam/sdk/util/state/SetState.java | 2 +- .../org/apache/beam/sdk/util/state/State.java | 2 +- .../apache/beam/sdk/util/state/StateSpecs.java | 2 +- .../beam/sdk/util/state/WatermarkHoldState.java | 2 +- .../apache/beam/sdk/transforms/ParDoTest.java | 2 +- 17 files changed, 76 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java index 3e83a7f..a1494ad 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; @@ -148,7 +148,7 @@ public class ApexStateInternalsTest { @Test public void testCombiningValue() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); @@ -168,7 +168,7 @@ public class ApexStateInternalsTest { @Test public void testCombiningIsEmpty() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); ReadableState<Boolean> readFuture = value.isEmpty(); http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java index e98d098..593d697 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java @@ -26,7 +26,7 @@ import java.util.Map; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; import org.apache.beam.sdk.util.state.State; @@ -159,7 +159,7 @@ public class StateMerging { * Prefetch all combining value state for {@code address} across all merging windows in {@code * context}. */ - public static <K, StateT extends CombiningState<?, ?>, W extends BoundedWindow> void + public static <K, StateT extends GroupingState<?, ?>, W extends BoundedWindow> void prefetchCombiningValues(MergingStateAccessor<K, W> context, StateTag<? super K, StateT> address) { for (StateT state : context.accessInEachMergingWindow(address).values()) { http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java index bb7e4a9..0f2f790 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java @@ -27,7 +27,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.AppliedCombineFn; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; /** @@ -97,10 +97,10 @@ public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends Bound }; } - private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag; + private StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag; public SystemReduceFn( - StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) { + StateTag<? super K, ? extends GroupingState<InputT, OutputT>> bufferTag) { this.bufferTag = bufferTag; } http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index 4444c22..29c29a7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.transforms.Min; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.GroupingState; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.format.PeriodFormat; @@ -169,7 +169,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger @Override public void onElement(OnElementContext c) throws Exception { - CombiningState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG); + GroupingState<Instant, Instant> delayUntilState = c.state().access(DELAYED_UNTIL_TAG); Instant oldDelayUntil = delayUntilState.read(); // Since processing time can only advance, resulting in target wake-up times we would http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index 1da946f..5f90084 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -37,7 +37,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.SetState; @@ -378,7 +378,7 @@ public class InMemoryStateInternalsTest { @Test public void testCombiningValue() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); @@ -398,7 +398,7 @@ public class InMemoryStateInternalsTest { @Test public void testCombiningIsEmpty() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); ReadableState<Boolean> readFuture = value.isEmpty(); http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index c7409bb..59c0a37 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -47,7 +47,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.MapState; import org.apache.beam.sdk.util.state.SetState; import org.apache.beam.sdk.util.state.ValueState; @@ -232,7 +232,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag = StateTags.combiningValue("summer", sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn); - CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag); + GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag); assertThat(underlyingValue.read(), equalTo(0L)); underlyingValue.add(1L); @@ -240,14 +240,14 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); - CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag); + GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag); assertThat(copyOnAccessState.read(), equalTo(1L)); copyOnAccessState.add(4L); assertThat(copyOnAccessState.read(), equalTo(5L)); assertThat(underlyingValue.read(), equalTo(1L)); - CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag); + GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag); assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); } @@ -265,7 +265,7 @@ public class CopyOnAccessInMemoryStateInternalsTest { sumLongFn.getAccumulatorCoder( reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)), sumLongFn); - CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag); + GroupingState<Long, Long> underlyingValue = underlying.state(namespace, stateTag); assertThat(underlyingValue.read(), equalTo(0L)); underlyingValue.add(1L); @@ -273,14 +273,14 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> internals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying); - CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag); + GroupingState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag); assertThat(copyOnAccessState.read(), equalTo(1L)); copyOnAccessState.add(4L); assertThat(copyOnAccessState.read(), equalTo(5L)); assertThat(underlyingValue.read(), equalTo(1L)); - CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag); + GroupingState<Long, Long> reReadUnderlyingValue = underlying.state(namespace, stateTag); assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read())); } http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java index db02cb3..f4e3ea8 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkBroadcastStateInternalsTest.java @@ -34,7 +34,7 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.ValueState; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -169,7 +169,7 @@ public class FlinkBroadcastStateInternalsTest { @Test public void testCombiningValue() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); @@ -189,7 +189,7 @@ public class FlinkBroadcastStateInternalsTest { @Test public void testCombiningIsEmpty() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); ReadableState<Boolean> readFuture = value.isEmpty(); http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 7839cf3..27747dd 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -39,7 +39,7 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.GroupingState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; @@ -199,7 +199,7 @@ public class FlinkStateInternalsTest { @Test public void testCombiningValue() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); // State instances are cached, but depend on the namespace. assertEquals(value, underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR)); @@ -219,7 +219,7 @@ public class FlinkStateInternalsTest { @Test public void testCombiningIsEmpty() throws Exception { - CombiningState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); + GroupingState<Integer, Integer> value = underTest.state(NAMESPACE_1, SUM_INTEGER_ADDR); assertThat(value.isEmpty().read(), Matchers.is(true)); ReadableState<Boolean> readFuture = value.isEmpty(); http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java index 8dd1678..6b120f9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/AccumulatorCombiningState.java @@ -21,14 +21,14 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; /** * State for a single value that is managed by a {@link CombineFn}. This is an internal extension - * to {@link CombiningState} that includes the {@code AccumT} type. + * to {@link GroupingState} that includes the {@code AccumT} type. * * @param <InputT> the type of values added to the state * @param <AccumT> the type of accumulator * @param <OutputT> the type of value extracted from the state */ public interface AccumulatorCombiningState<InputT, AccumT, OutputT> - extends CombiningState<InputT, OutputT> { + extends GroupingState<InputT, OutputT> { /** * Read the merged accumulator for this combining value. It is implied that reading the http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java index c7e6d13..e0eebe5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java @@ -22,7 +22,7 @@ package org.apache.beam.sdk.util.state; * * @param <T> The type of elements in the bag. */ -public interface BagState<T> extends CombiningState<T, Iterable<T>> { +public interface BagState<T> extends GroupingState<T, Iterable<T>> { @Override BagState<T> readLater(); } http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java deleted file mode 100644 index 1155262..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single - * {@code OutputT} value. - * - * @param <InputT> the type of values added to the state - * @param <OutputT> the type of value extracted from the state - */ -public interface CombiningState<InputT, OutputT> extends ReadableState<OutputT>, State { - /** - * Add a value to the buffer. - */ - void add(InputT value); - - /** - * Return true if this state is empty. - */ - ReadableState<Boolean> isEmpty(); - - @Override - CombiningState<InputT, OutputT> readLater(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java new file mode 100644 index 0000000..bd7a8d9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single + * {@code OutputT} value. + * + * @param <InputT> the type of values added to the state + * @param <OutputT> the type of value extracted from the state + */ +public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State { + /** + * Add a value to the buffer. + */ + void add(InputT value); + + /** + * Return true if this state is empty. + */ + ReadableState<Boolean> isEmpty(); + + @Override + GroupingState<InputT, OutputT> readLater(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java index 93058b2..5c907d5 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java @@ -23,7 +23,7 @@ package org.apache.beam.sdk.util.state; * * @param <T> The type of elements in the set. */ -public interface SetState<T> extends CombiningState<T, Iterable<T>> { +public interface SetState<T> extends GroupingState<T, Iterable<T>> { /** * Returns true if this set contains the specified element. */ http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java index 973cb9c..3a49f01 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java @@ -21,7 +21,7 @@ package org.apache.beam.sdk.util.state; * Base interface for all state locations. * * <p>Specific types of state add appropriate accessors for reading and writing values, see - * {@link ValueState}, {@link BagState}, and {@link CombiningState}. + * {@link ValueState}, {@link BagState}, and {@link GroupingState}. */ public interface State { http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index 6a8c80b..db4b7de 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -445,7 +445,7 @@ public class StateSpecs { @Override public void finishSpecifying() { if (getAccumCoder() == null) { - throw new IllegalStateException("Unable to infer a coder for CombiningState and no" + throw new IllegalStateException("Unable to infer a coder for GroupingState and no" + " Coder was specified. Please set a coder by either invoking" + " StateSpecs.combiningValue(Coder<AccumT> accumCoder," + " CombineFn<InputT, AccumT, OutputT> combineFn)" http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java index 415cc6e..20fa05f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java @@ -31,7 +31,7 @@ import org.joda.time.Instant; */ @Experimental(Kind.STATE) public interface WatermarkHoldState<W extends BoundedWindow> - extends CombiningState<Instant, Instant> { + extends GroupingState<Instant, Instant> { /** * Return the {@link OutputTimeFn} which will be used to determine a watermark hold time given * an element timestamp, and to combine watermarks from windows which are about to be merged. http://git-wip-us.apache.org/repos/asf/beam/blob/24c0495a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 4249a77..cc67ac2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -2495,7 +2495,7 @@ public class ParDoTest implements Serializable { }; thrown.expect(RuntimeException.class); - thrown.expectMessage("Unable to infer a coder for CombiningState and no Coder was specified."); + thrown.expectMessage("Unable to infer a coder for GroupingState and no Coder was specified."); pipeline .apply(Create.of(KV.of("hello", 3), KV.of("hello", 6), KV.of("hello", 7)))