Move StateTag and friends to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/07d93276 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/07d93276 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/07d93276 Branch: refs/heads/master Commit: 07d93276e7862e1c238e75854a7faeb15b2d5d60 Parents: c42a19b Author: Kenneth Knowles <k...@google.com> Authored: Fri Feb 3 19:51:18 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Feb 7 11:44:34 2017 -0800 ---------------------------------------------------------------------- .../translation/utils/ApexStateInternals.java | 4 +- .../utils/ApexStateInternalsTest.java | 4 +- .../runners/core/InMemoryStateInternals.java | 3 +- .../runners/core/MergingActiveWindowSet.java | 2 - .../beam/runners/core/MergingStateAccessor.java | 1 - .../apache/beam/runners/core/NonEmptyPanes.java | 2 - .../beam/runners/core/PaneInfoTracker.java | 2 - .../runners/core/ReduceFnContextFactory.java | 1 - .../beam/runners/core/SideInputHandler.java | 2 - .../beam/runners/core/SimpleDoFnRunner.java | 1 - .../beam/runners/core/SplittableParDo.java | 2 - .../apache/beam/runners/core/StateAccessor.java | 1 - .../beam/runners/core/StateInternals.java | 1 - .../apache/beam/runners/core/StateMerging.java | 1 - .../apache/beam/runners/core/StateTable.java | 3 +- .../org/apache/beam/runners/core/StateTag.java | 117 ++++++ .../org/apache/beam/runners/core/StateTags.java | 352 +++++++++++++++++++ .../beam/runners/core/SystemReduceFn.java | 2 - .../core/TestInMemoryStateInternals.java | 1 - .../apache/beam/runners/core/WatermarkHold.java | 2 - .../AfterDelayFromFirstElementStateMachine.java | 4 +- .../core/triggers/AfterPaneStateMachine.java | 4 +- .../TriggerStateMachineContextFactory.java | 2 +- .../triggers/TriggerStateMachineRunner.java | 4 +- .../core/InMemoryStateInternalsTest.java | 2 - .../beam/runners/core/ReduceFnTester.java | 1 - .../apache/beam/runners/core/StateTagTest.java | 172 +++++++++ .../CopyOnAccessInMemoryStateInternals.java | 4 +- .../direct/StatefulParDoEvaluatorFactory.java | 4 +- .../CopyOnAccessInMemoryStateInternalsTest.java | 4 +- .../runners/direct/EvaluationContextTest.java | 4 +- .../StatefulParDoEvaluatorFactoryTest.java | 4 +- .../wrappers/streaming/FlinkStateInternals.java | 2 +- .../streaming/FlinkStateInternalsTest.java | 4 +- .../apache/beam/sdk/util/state/StateTag.java | 111 ------ .../apache/beam/sdk/util/state/StateTags.java | 344 ------------------ .../beam/sdk/util/state/StateTagTest.java | 172 --------- 37 files changed, 667 insertions(+), 679 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java index 544000d..34d993f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -31,6 +31,8 @@ import java.util.List; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTag.StateBinder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.InstantCoder; @@ -47,8 +49,6 @@ import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java index d6a4515..3e83a7f 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java @@ -27,6 +27,8 @@ import java.util.Arrays; import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.transforms.Sum; @@ -37,8 +39,6 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.hamcrest.Matchers; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index 4779954..6a181f3 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; import javax.annotation.Nullable; +import org.apache.beam.runners.core.StateTag.StateBinder; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; @@ -37,8 +38,6 @@ import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java index 806591d..b4e864c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java @@ -38,8 +38,6 @@ import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; /** http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java index e670bd6..e948650 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateTag; /** * Interface for accessing persistent state while windows are merging. http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java index d9b8cd4..aa033ce 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java @@ -24,8 +24,6 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; /** * Tracks which windows have non-empty panes. Specifically, which windows have new elements since http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java index c43f846..4cf4d67 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java @@ -26,8 +26,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index b1a544f..66a6ef8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index 16324c1..24f326d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -32,8 +32,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 7a89389..2b93ca0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -57,7 +57,6 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 25517f6..544bfa0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -51,8 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java index a45d865..87353f2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateTag; /** * Interface for accessing a {@link StateTag} in the current context. http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java index 7490c20..e6440bf 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; -import org.apache.beam.sdk.util.state.StateTag; /** * {@code StateInternals} describes the functionality a runner needs to provide for the http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java index 2c65cd9..c533f83 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java index 48ebea3..d2511c9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java @@ -21,10 +21,9 @@ import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; import java.util.Map; import java.util.Set; +import org.apache.beam.runners.core.StateTag.StateBinder; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; /** * Table mapping {@code StateNamespace} and {@code StateTag<?>} to a {@code State} instance. http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java new file mode 100644 index 0000000..a3d703f --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.IOException; +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; + +/** + * An address and specification for a persistent state cell. This includes a unique identifier for + * the location, the information necessary to encode the value, and details about the intended + * access pattern. + * + * <p>State can be thought of as a sparse table, with each {@code StateTag} defining a column + * that has cells of type {@code StateT}. + * + * <p>Currently, this can only be used in a step immediately following a {@link GroupByKey}. + * + * @param <K> The type of key that must be used with the state tag. Contravariant: methods should + * accept values of type {@code KeyedStateTag<? super K, StateT>}. + * @param <StateT> The type of state being tagged. + */ +@Experimental(Kind.STATE) +public interface StateTag<K, StateT extends State> extends Serializable { + + /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */ + void appendTo(Appendable sb) throws IOException; + + /** + * An identifier for the state cell that this tag references. + */ + String getId(); + + /** + * The specification for the state stored in the referenced cell. + */ + StateSpec<K, StateT> getSpec(); + + /** + * Bind this state tag. See {@link StateSpec#bind}. + * + * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} for now. + */ + @Deprecated + StateT bind(StateBinder<? extends K> binder); + + /** + * Visitor for binding a {@link StateSpec} and to the associated {@link State}. + * + * @param <K> the type of key this binder embodies. + * @deprecated for migration only; runners should reference the top level {@link StateBinder} + * and move towards {@link StateSpec} rather than {@link StateTag}. + */ + @Deprecated + public interface StateBinder<K> { + <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, Coder<T> coder); + + <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> elemCoder); + + <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineFn<InputT, AccumT, OutputT> combineFn); + + <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); + + <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> + combineFn); + + /** + * Bind to a watermark {@link StateSpec}. + * + * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to + * the returned {@link WatermarkHoldState} are to be combined. + */ + <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + StateTag<? super K, WatermarkHoldState<W>> spec, + OutputTimeFn<? super W> outputTimeFn); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java new file mode 100644 index 0000000..cf7c236 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java @@ -0,0 +1,352 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.base.MoreObjects; +import java.io.IOException; +import java.io.Serializable; +import java.util.Objects; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateBinder; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.util.state.WatermarkHoldState; + +/** + * Static utility methods for creating {@link StateTag} instances. + */ +@Experimental(Kind.STATE) +public class StateTags { + + private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry(); + + static { + STANDARD_REGISTRY.registerStandardCoders(); + } + + /** @deprecated for migration purposes only */ + @Deprecated + private static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) { + return new StateBinder<K>() { + @Override + public <T> ValueState<T> bindValue( + String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) { + return binder.bindValue(tagForSpec(id, spec), coder); + } + + @Override + public <T> BagState<T> bindBag( + String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) { + return binder.bindBag(tagForSpec(id, spec), elemCoder); + } + + @Override + public <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineFn<InputT, AccumT, OutputT> combineFn) { + return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); + } + + @Override + public <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); + } + + @Override + public <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { + return binder.bindKeyedCombiningValueWithContext( + tagForSpec(id, spec), accumCoder, combineFn); + } + + @Override + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + String id, + StateSpec<? super K, WatermarkHoldState<W>> spec, + OutputTimeFn<? super W> outputTimeFn) { + return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn); + } + }; + } + + private enum StateKind { + SYSTEM('s'), + USER('u'); + + private char prefix; + + StateKind(char prefix) { + this.prefix = prefix; + } + } + + private StateTags() { } + + private interface SystemStateTag<K, StateT extends State> { + StateTag<K, StateT> asKind(StateKind kind); + } + + /** Create a state tag for the given id and spec. */ + public static <K, StateT extends State> StateTag<K, StateT> tagForSpec( + String id, StateSpec<K, StateT> spec) { + return new SimpleStateTag<>(new StructuredId(id), spec); + } + + /** + * Create a simple state tag for values of type {@code T}. + */ + public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) { + return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder)); + } + + /** + * Create a state tag for values that use a {@link CombineFn} to automatically merge + * multiple {@code InputT}s into a single {@code OutputT}. + */ + public static <InputT, AccumT, OutputT> + StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> + combiningValue( + String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn)); + } + + /** + * Create a state tag for values that use a {@link KeyedCombineFn} to automatically merge + * multiple {@code InputT}s into a single {@code OutputT}. + */ + public static <K, InputT, AccumT, + OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> + keyedCombiningValue(String id, Coder<AccumT> accumCoder, + KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn)); + } + + /** + * Create a state tag for values that use a {@link KeyedCombineFnWithContext} to automatically + * merge multiple {@code InputT}s into a single {@code OutputT}. + */ + public static <K, InputT, AccumT, OutputT> + StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> + keyedCombiningValueWithContext( + String id, + Coder<AccumT> accumCoder, + KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn)); + } + + /** + * Create a state tag for values that use a {@link CombineFn} to automatically merge + * multiple {@code InputT}s into a single {@code OutputT}. + * + * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and + * should only be used to initialize static values. + */ + public static <InputT, AccumT, OutputT> + StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> + combiningValueFromInputInternal( + String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn)); + } + + /** + * Create a state tag that is optimized for adding values frequently, and + * occasionally retrieving all the values that have been added. + */ + public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) { + return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder)); + } + + /** + * Create a state tag for holding the watermark. + */ + public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>> + watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) { + return new SimpleStateTag<>( + new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn)); + } + + /** + * Convert an arbitrary {@link StateTag} to a system-internal tag that is guaranteed not to + * collide with any user tags. + */ + public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal( + StateTag<K, StateT> tag) { + if (!(tag instanceof SystemStateTag)) { + throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag); + } + // Checked above + @SuppressWarnings("unchecked") + SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag; + return typedTag.asKind(StateKind.SYSTEM); + } + + public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>> + convertToBagTagInternal( + StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) { + return new SimpleStateTag<>( + new StructuredId(combiningTag.getId()), + StateSpecs.convertToBagSpecInternal(combiningTag.getSpec())); + } + + private static class StructuredId implements Serializable { + private final StateKind kind; + private final String rawId; + + private StructuredId(String rawId) { + this(StateKind.USER, rawId); + } + + private StructuredId(StateKind kind, String rawId) { + this.kind = kind; + this.rawId = rawId; + } + + public StructuredId asKind(StateKind kind) { + return new StructuredId(kind, rawId); + } + + public void appendTo(Appendable sb) throws IOException { + sb.append(kind.prefix).append(rawId); + } + + public String getRawId() { + return rawId; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("id", rawId) + .add("kind", kind) + .toString(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof StructuredId)) { + return false; + } + + StructuredId that = (StructuredId) obj; + return Objects.equals(this.kind, that.kind) + && Objects.equals(this.rawId, that.rawId); + } + + @Override + public int hashCode() { + return Objects.hash(kind, rawId); + } + } + + /** + * A basic {@link StateTag} implementation that manages the structured ids. + */ + private static class SimpleStateTag<K, StateT extends State> + implements StateTag<K, StateT>, SystemStateTag<K, StateT> { + + private final StateSpec<K, StateT> spec; + private final StructuredId id; + + public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) { + this.id = id; + this.spec = spec; + } + + @Override + @Deprecated + public StateT bind(StateTag.StateBinder<? extends K> binder) { + return spec.bind( + this.id.getRawId(), adaptTagBinder(binder)); + } + + @Override + public String getId() { + return id.getRawId(); + } + + @Override + public StateSpec<K, StateT> getSpec() { + return spec; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("id", id) + .toString(); + } + + @Override + public void appendTo(Appendable sb) throws IOException { + id.appendTo(sb); + } + + @Override + public StateTag<K, StateT> asKind(StateKind kind) { + return new SimpleStateTag<>(id.asKind(kind), spec); + } + + @Override + public boolean equals(Object other) { + if (!(other instanceof SimpleStateTag)) { + return false; + } + + SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other; + return Objects.equals(this.getId(), otherTag.getId()) + && Objects.equals(this.getSpec(), otherTag.getSpec()); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), this.getId(), this.getSpec()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java index 4a876d1..bb7e4a9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java @@ -29,8 +29,6 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; /** * {@link ReduceFn} implementing the default reduction behaviors of {@link GroupByKey}. http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java index ed852a5..0321a33 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java @@ -21,7 +21,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 99e15f0..d3c4bc7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -30,8 +30,6 @@ import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.joda.time.Duration; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index 7cc9130..b720644 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -25,6 +25,8 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.MergingStateAccessor; import org.apache.beam.runners.core.StateAccessor; import org.apache.beam.runners.core.StateMerging; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.InstantCoder; @@ -34,8 +36,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.format.PeriodFormat; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java index b5265ba..1dd5b65 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java @@ -21,13 +21,13 @@ import java.util.Objects; import org.apache.beam.runners.core.MergingStateAccessor; import org.apache.beam.runners.core.StateAccessor; import org.apache.beam.runners.core.StateMerging; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; /** * {@link TriggerStateMachine}s that fire based on properties of the elements in the current pane. http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java index 3bdfc82..315110d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.core.StateAccessor; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerInfo; import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo; import org.apache.beam.sdk.coders.Coder; @@ -38,7 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateTag; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java index d3adaa4..542439f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -26,11 +26,11 @@ import java.util.Collection; import java.util.Map; import org.apache.beam.runners.core.MergingStateAccessor; import org.apache.beam.runners.core.StateAccessor; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.BitSetCoder; import org.apache.beam.sdk.util.Timers; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java index ca0a8e5..8ea9abc 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java @@ -32,8 +32,6 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.hamcrest.Matchers; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index bea0c39..dab2bf9 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -71,7 +71,6 @@ import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java new file mode 100644 index 0000000..9a04628 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Max; +import org.apache.beam.sdk.transforms.Min; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.util.CombineFnUtil; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link StateTag}. + */ +@RunWith(JUnit4.class) +public class StateTagTest { + @Test + public void testValueEquality() { + StateTag<?, ?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of()); + StateTag<?, ?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of()); + StateTag<?, ?> fooBigEndian = StateTags.value("foo", BigEndianIntegerCoder.of()); + StateTag<?, ?> barVarInt = StateTags.value("bar", VarIntCoder.of()); + + assertEquals(fooVarInt1, fooVarInt2); + assertNotEquals(fooVarInt1, fooBigEndian); + assertNotEquals(fooVarInt1, barVarInt); + } + + @Test + public void testBagEquality() { + StateTag<?, ?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of()); + StateTag<?, ?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of()); + StateTag<?, ?> fooBigEndian = StateTags.bag("foo", BigEndianIntegerCoder.of()); + StateTag<?, ?> barVarInt = StateTags.bag("bar", VarIntCoder.of()); + + assertEquals(fooVarInt1, fooVarInt2); + assertNotEquals(fooVarInt1, fooBigEndian); + assertNotEquals(fooVarInt1, barVarInt); + } + + @Test + public void testWatermarkBagEquality() { + StateTag<?, ?> foo1 = StateTags.watermarkStateInternal( + "foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + StateTag<?, ?> foo2 = StateTags.watermarkStateInternal( + "foo", OutputTimeFns.outputAtEarliestInputTimestamp()); + StateTag<?, ?> bar = StateTags.watermarkStateInternal( + "bar", OutputTimeFns.outputAtEarliestInputTimestamp()); + + StateTag<?, ?> bar2 = StateTags.watermarkStateInternal( + "bar", OutputTimeFns.outputAtLatestInputTimestamp()); + + // Same id, same fn. + assertEquals(foo1, foo2); + // Different id, same fn. + assertNotEquals(foo1, bar); + // Same id, different fn. + assertEquals(bar, bar2); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testCombiningValueEquality() { + Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers(); + Coder<Integer> input1 = VarIntCoder.of(); + Coder<Integer> input2 = BigEndianIntegerCoder.of(); + Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers(); + + StateTag<?, ?> fooCoder1Max1 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn); + StateTag<?, ?> fooCoder1Max2 = StateTags.combiningValueFromInputInternal("foo", input1, maxFn); + StateTag<?, ?> fooCoder1Min = StateTags.combiningValueFromInputInternal("foo", input1, minFn); + + StateTag<?, ?> fooCoder2Max = StateTags.combiningValueFromInputInternal("foo", input2, maxFn); + StateTag<?, ?> barCoder1Max = StateTags.combiningValueFromInputInternal("bar", input1, maxFn); + + // Same name, coder and combineFn + assertEquals(fooCoder1Max1, fooCoder1Max2); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max2)); + + // Different combineFn, but we treat them as equal since we only serialize the bits. + assertEquals(fooCoder1Max1, fooCoder1Min); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Min)); + + // Different input coder coder. + assertNotEquals(fooCoder1Max1, fooCoder2Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder2Max)); + + // These StateTags have different IDs. + assertNotEquals(fooCoder1Max1, barCoder1Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) barCoder1Max)); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Test + public void testCombiningValueWithContextEquality() { + CoderRegistry registry = new CoderRegistry(); + registry.registerStandardCoders(); + + Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers(); + Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers(); + + Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, VarIntCoder.of()); + Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, BigEndianIntegerCoder.of()); + + StateTag<?, ?> fooCoder1Max1 = StateTags.keyedCombiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).<String>asKeyedFn()); + StateTag<?, ?> fooCoder1Max2 = StateTags.keyedCombiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + StateTag<?, ?> fooCoder1Min = StateTags.keyedCombiningValueWithContext( + "foo", accum1, CombineFnUtil.toFnWithContext(minFn).asKeyedFn()); + + StateTag<?, ?> fooCoder2Max = StateTags.keyedCombiningValueWithContext( + "foo", accum2, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + StateTag<?, ?> barCoder1Max = StateTags.keyedCombiningValueWithContext( + "bar", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn()); + + // Same name, coder and combineFn + assertEquals(fooCoder1Max1, fooCoder1Max2); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max2)); + // Different combineFn, but we treat them as equal since we only serialize the bits. + assertEquals(fooCoder1Max1, fooCoder1Min); + assertEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder1Min)); + + // Different input coder coder. + assertNotEquals(fooCoder1Max1, fooCoder2Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) fooCoder2Max)); + + // These StateTags have different IDs. + assertNotEquals(fooCoder1Max1, barCoder1Max); + assertNotEquals( + StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1), + StateTags.convertToBagTagInternal((StateTag) barCoder1Max)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index 4b59bf9..47c0251 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -34,6 +34,8 @@ import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryWatermarkHold import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateTable; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTag.StateBinder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; @@ -46,8 +48,6 @@ import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTag.StateBinder; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index b3a2156..0ad40ac 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -30,6 +30,8 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; @@ -45,8 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java index 25cd252..c8eb66e 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java @@ -31,6 +31,8 @@ import static org.mockito.Mockito.verify; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -45,8 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index 6dd2ea4..d6f2263 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -32,6 +32,8 @@ import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; @@ -59,8 +61,6 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index cc5ee74..ac7d2bd 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -38,6 +38,8 @@ import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo; @@ -59,8 +61,6 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java index 5731a38..eaededb 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -40,7 +41,6 @@ import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.flink.api.common.state.ListStateDescriptor; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 6a086a7..465dad3 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -27,6 +27,8 @@ import java.util.Arrays; import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaceForTest; +import org.apache.beam.runners.core.StateTag; +import org.apache.beam.runners.core.StateTags; import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkStateInternals; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -40,8 +42,6 @@ import org.apache.beam.sdk.util.state.AccumulatorCombiningState; import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.CombiningState; import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.flink.api.common.ExecutionConfig; http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java deleted file mode 100644 index feca927..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import java.io.IOException; -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; - -/** - * An address and specification for a persistent state cell. This includes a unique identifier for - * the location, the information necessary to encode the value, and details about the intended - * access pattern. - * - * <p>State can be thought of as a sparse table, with each {@code StateTag} defining a column - * that has cells of type {@code StateT}. - * - * <p>Currently, this can only be used in a step immediately following a {@link GroupByKey}. - * - * @param <K> The type of key that must be used with the state tag. Contravariant: methods should - * accept values of type {@code KeyedStateTag<? super K, StateT>}. - * @param <StateT> The type of state being tagged. - */ -@Experimental(Kind.STATE) -public interface StateTag<K, StateT extends State> extends Serializable { - - /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */ - void appendTo(Appendable sb) throws IOException; - - /** - * An identifier for the state cell that this tag references. - */ - String getId(); - - /** - * The specification for the state stored in the referenced cell. - */ - StateSpec<K, StateT> getSpec(); - - /** - * Bind this state tag. See {@link StateSpec#bind}. - * - * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} for now. - */ - @Deprecated - StateT bind(StateBinder<? extends K> binder); - - /** - * Visitor for binding a {@link StateSpec} and to the associated {@link State}. - * - * @param <K> the type of key this binder embodies. - * @deprecated for migration only; runners should reference the top level {@link StateBinder} - * and move towards {@link StateSpec} rather than {@link StateTag}. - */ - @Deprecated - public interface StateBinder<K> { - <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, Coder<T> coder); - - <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> elemCoder); - - <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - CombineFn<InputT, AccumT, OutputT> combineFn); - - <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn); - - <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> - combineFn); - - /** - * Bind to a watermark {@link StateSpec}. - * - * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold timestamps added to - * the returned {@link WatermarkHoldState} are to be combined. - */ - <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - StateTag<? super K, WatermarkHoldState<W>> spec, - OutputTimeFn<? super W> outputTimeFn); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java deleted file mode 100644 index acb1f08..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java +++ /dev/null @@ -1,344 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import com.google.common.base.MoreObjects; -import java.io.IOException; -import java.io.Serializable; -import java.util.Objects; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; - -/** - * Static utility methods for creating {@link StateTag} instances. - */ -@Experimental(Kind.STATE) -public class StateTags { - - private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry(); - - static { - STANDARD_REGISTRY.registerStandardCoders(); - } - - /** @deprecated for migration purposes only */ - @Deprecated - private static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) { - return new StateBinder<K>() { - @Override - public <T> ValueState<T> bindValue( - String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) { - return binder.bindValue(tagForSpec(id, spec), coder); - } - - @Override - public <T> BagState<T> bindBag( - String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) { - return binder.bindBag(tagForSpec(id, spec), elemCoder); - } - - @Override - public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( - String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - CombineFn<InputT, AccumT, OutputT> combineFn) { - return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); - } - - @Override - public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); - } - - @Override - public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return binder.bindKeyedCombiningValueWithContext( - tagForSpec(id, spec), accumCoder, combineFn); - } - - @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - String id, - StateSpec<? super K, WatermarkHoldState<W>> spec, - OutputTimeFn<? super W> outputTimeFn) { - return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn); - } - }; - } - - private enum StateKind { - SYSTEM('s'), - USER('u'); - - private char prefix; - - StateKind(char prefix) { - this.prefix = prefix; - } - } - - private StateTags() { } - - private interface SystemStateTag<K, StateT extends State> { - StateTag<K, StateT> asKind(StateKind kind); - } - - /** Create a state tag for the given id and spec. */ - public static <K, StateT extends State> StateTag<K, StateT> tagForSpec( - String id, StateSpec<K, StateT> spec) { - return new SimpleStateTag<>(new StructuredId(id), spec); - } - - /** - * Create a simple state tag for values of type {@code T}. - */ - public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> valueCoder) { - return new SimpleStateTag<>(new StructuredId(id), StateSpecs.value(valueCoder)); - } - - /** - * Create a state tag for values that use a {@link CombineFn} to automatically merge - * multiple {@code InputT}s into a single {@code OutputT}. - */ - public static <InputT, AccumT, OutputT> - StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> - combiningValue( - String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.combiningValue(accumCoder, combineFn)); - } - - /** - * Create a state tag for values that use a {@link KeyedCombineFn} to automatically merge - * multiple {@code InputT}s into a single {@code OutputT}. - */ - public static <K, InputT, AccumT, - OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> - keyedCombiningValue(String id, Coder<AccumT> accumCoder, - KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) { - return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, combineFn)); - } - - /** - * Create a state tag for values that use a {@link KeyedCombineFnWithContext} to automatically - * merge multiple {@code InputT}s into a single {@code OutputT}. - */ - public static <K, InputT, AccumT, OutputT> - StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> - keyedCombiningValueWithContext( - String id, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) { - return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn)); - } - - /** - * Create a state tag for values that use a {@link CombineFn} to automatically merge - * multiple {@code InputT}s into a single {@code OutputT}. - * - * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and - * should only be used to initialize static values. - */ - public static <InputT, AccumT, OutputT> - StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>> - combiningValueFromInputInternal( - String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn)); - } - - /** - * Create a state tag that is optimized for adding values frequently, and - * occasionally retrieving all the values that have been added. - */ - public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> elemCoder) { - return new SimpleStateTag<>(new StructuredId(id), StateSpecs.bag(elemCoder)); - } - - /** - * Create a state tag for holding the watermark. - */ - public static <W extends BoundedWindow> StateTag<Object, WatermarkHoldState<W>> - watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) { - return new SimpleStateTag<>( - new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn)); - } - - /** - * Convert an arbitrary {@link StateTag} to a system-internal tag that is guaranteed not to - * collide with any user tags. - */ - public static <K, StateT extends State> StateTag<K, StateT> makeSystemTagInternal( - StateTag<K, StateT> tag) { - if (!(tag instanceof SystemStateTag)) { - throw new IllegalArgumentException("Expected subclass of SimpleStateTag, got " + tag); - } - // Checked above - @SuppressWarnings("unchecked") - SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag; - return typedTag.asKind(StateKind.SYSTEM); - } - - public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>> - convertToBagTagInternal( - StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> combiningTag) { - return new SimpleStateTag<>( - new StructuredId(combiningTag.getId()), - StateSpecs.convertToBagSpecInternal(combiningTag.getSpec())); - } - - private static class StructuredId implements Serializable { - private final StateKind kind; - private final String rawId; - - private StructuredId(String rawId) { - this(StateKind.USER, rawId); - } - - private StructuredId(StateKind kind, String rawId) { - this.kind = kind; - this.rawId = rawId; - } - - public StructuredId asKind(StateKind kind) { - return new StructuredId(kind, rawId); - } - - public void appendTo(Appendable sb) throws IOException { - sb.append(kind.prefix).append(rawId); - } - - public String getRawId() { - return rawId; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("id", rawId) - .add("kind", kind) - .toString(); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - - if (!(obj instanceof StructuredId)) { - return false; - } - - StructuredId that = (StructuredId) obj; - return Objects.equals(this.kind, that.kind) - && Objects.equals(this.rawId, that.rawId); - } - - @Override - public int hashCode() { - return Objects.hash(kind, rawId); - } - } - - /** - * A basic {@link StateTag} implementation that manages the structured ids. - */ - private static class SimpleStateTag<K, StateT extends State> - implements StateTag<K, StateT>, SystemStateTag<K, StateT> { - - private final StateSpec<K, StateT> spec; - private final StructuredId id; - - public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) { - this.id = id; - this.spec = spec; - } - - @Override - @Deprecated - public StateT bind(StateTag.StateBinder<? extends K> binder) { - return spec.bind( - this.id.getRawId(), adaptTagBinder(binder)); - } - - @Override - public String getId() { - return id.getRawId(); - } - - @Override - public StateSpec<K, StateT> getSpec() { - return spec; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("id", id) - .toString(); - } - - @Override - public void appendTo(Appendable sb) throws IOException { - id.appendTo(sb); - } - - @Override - public StateTag<K, StateT> asKind(StateKind kind) { - return new SimpleStateTag<>(id.asKind(kind), spec); - } - - @Override - public boolean equals(Object other) { - if (!(other instanceof SimpleStateTag)) { - return false; - } - - SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other; - return Objects.equals(this.getId(), otherTag.getId()) - && Objects.equals(this.getSpec(), otherTag.getSpec()); - } - - @Override - public int hashCode() { - return Objects.hash(getClass(), this.getId(), this.getSpec()); - } - } -}