Move StateTag and friends to runners/core-java

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/07d93276
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/07d93276
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/07d93276

Branch: refs/heads/master
Commit: 07d93276e7862e1c238e75854a7faeb15b2d5d60
Parents: c42a19b
Author: Kenneth Knowles <k...@google.com>
Authored: Fri Feb 3 19:51:18 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Feb 7 11:44:34 2017 -0800

----------------------------------------------------------------------
 .../translation/utils/ApexStateInternals.java   |   4 +-
 .../utils/ApexStateInternalsTest.java           |   4 +-
 .../runners/core/InMemoryStateInternals.java    |   3 +-
 .../runners/core/MergingActiveWindowSet.java    |   2 -
 .../beam/runners/core/MergingStateAccessor.java |   1 -
 .../apache/beam/runners/core/NonEmptyPanes.java |   2 -
 .../beam/runners/core/PaneInfoTracker.java      |   2 -
 .../runners/core/ReduceFnContextFactory.java    |   1 -
 .../beam/runners/core/SideInputHandler.java     |   2 -
 .../beam/runners/core/SimpleDoFnRunner.java     |   1 -
 .../beam/runners/core/SplittableParDo.java      |   2 -
 .../apache/beam/runners/core/StateAccessor.java |   1 -
 .../beam/runners/core/StateInternals.java       |   1 -
 .../apache/beam/runners/core/StateMerging.java  |   1 -
 .../apache/beam/runners/core/StateTable.java    |   3 +-
 .../org/apache/beam/runners/core/StateTag.java  | 117 ++++++
 .../org/apache/beam/runners/core/StateTags.java | 352 +++++++++++++++++++
 .../beam/runners/core/SystemReduceFn.java       |   2 -
 .../core/TestInMemoryStateInternals.java        |   1 -
 .../apache/beam/runners/core/WatermarkHold.java |   2 -
 .../AfterDelayFromFirstElementStateMachine.java |   4 +-
 .../core/triggers/AfterPaneStateMachine.java    |   4 +-
 .../TriggerStateMachineContextFactory.java      |   2 +-
 .../triggers/TriggerStateMachineRunner.java     |   4 +-
 .../core/InMemoryStateInternalsTest.java        |   2 -
 .../beam/runners/core/ReduceFnTester.java       |   1 -
 .../apache/beam/runners/core/StateTagTest.java  | 172 +++++++++
 .../CopyOnAccessInMemoryStateInternals.java     |   4 +-
 .../direct/StatefulParDoEvaluatorFactory.java   |   4 +-
 .../CopyOnAccessInMemoryStateInternalsTest.java |   4 +-
 .../runners/direct/EvaluationContextTest.java   |   4 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |   4 +-
 .../wrappers/streaming/FlinkStateInternals.java |   2 +-
 .../streaming/FlinkStateInternalsTest.java      |   4 +-
 .../apache/beam/sdk/util/state/StateTag.java    | 111 ------
 .../apache/beam/sdk/util/state/StateTags.java   | 344 ------------------
 .../beam/sdk/util/state/StateTagTest.java       | 172 ---------
 37 files changed, 667 insertions(+), 679 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index 544000d..34d993f 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -31,6 +31,8 @@ import java.util.List;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTag.StateBinder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
 import org.apache.beam.sdk.coders.InstantCoder;
@@ -47,8 +49,6 @@ import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTag.StateBinder;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
index d6a4515..3e83a7f 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternalsTest.java
@@ -27,6 +27,8 @@ import java.util.Arrays;
 import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.transforms.Sum;
@@ -37,8 +39,6 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.hamcrest.Matchers;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index 4779954..6a181f3 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.StateTag.StateBinder;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
@@ -37,8 +38,6 @@ import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTag.StateBinder;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
index 806591d..b4e864c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
@@ -38,8 +38,6 @@ import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
index e670bd6..e948650 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingStateAccessor.java
@@ -22,7 +22,6 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateTag;
 
 /**
  * Interface for accessing persistent state while windows are merging.

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
index d9b8cd4..aa033ce 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/NonEmptyPanes.java
@@ -24,8 +24,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 
 /**
  * Tracks which windows have non-empty panes. Specifically, which windows have 
new elements since

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
index c43f846..4cf4d67 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -26,8 +26,6 @@ import 
org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.joda.time.Instant;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index b1a544f..66a6ef8 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.joda.time.Instant;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 16324c1..24f326d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -32,8 +32,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.PCollectionView;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 7a89389..2b93ca0 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 25517f6..544bfa0 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -51,8 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
index a45d865..87353f2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateAccessor.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateTag;
 
 /**
  * Interface for accessing a {@link StateTag} in the current context.

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
index 7490c20..e6440bf 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
@@ -22,7 +22,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateTag;
 
 /**
  * {@code StateInternals} describes the functionality a runner needs to 
provide for the

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
index 2c65cd9..c533f83 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateMerging.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.joda.time.Instant;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
index 48ebea3..d2511c9 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTable.java
@@ -21,10 +21,9 @@ import com.google.common.collect.HashBasedTable;
 import com.google.common.collect.Table;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.runners.core.StateTag.StateBinder;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTag.StateBinder;
 
 /**
  * Table mapping {@code StateNamespace} and {@code StateTag<?>} to a {@code 
State} instance.

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
new file mode 100644
index 0000000..a3d703f
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTag.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+
+/**
+ * An address and specification for a persistent state cell. This includes a 
unique identifier for
+ * the location, the information necessary to encode the value, and details 
about the intended
+ * access pattern.
+ *
+ * <p>State can be thought of as a sparse table, with each {@code StateTag} 
defining a column
+ * that has cells of type {@code StateT}.
+ *
+ * <p>Currently, this can only be used in a step immediately following a 
{@link GroupByKey}.
+ *
+ * @param <K> The type of key that must be used with the state tag. 
Contravariant: methods should
+ *            accept values of type {@code KeyedStateTag<? super K, StateT>}.
+ * @param <StateT> The type of state being tagged.
+ */
+@Experimental(Kind.STATE)
+public interface StateTag<K, StateT extends State> extends Serializable {
+
+  /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
+  void appendTo(Appendable sb) throws IOException;
+
+  /**
+   * An identifier for the state cell that this tag references.
+   */
+  String getId();
+
+  /**
+   * The specification for the state stored in the referenced cell.
+   */
+  StateSpec<K, StateT> getSpec();
+
+  /**
+   * Bind this state tag. See {@link StateSpec#bind}.
+   *
+   * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} 
for now.
+   */
+  @Deprecated
+  StateT bind(StateBinder<? extends K> binder);
+
+  /**
+   * Visitor for binding a {@link StateSpec} and to the associated {@link 
State}.
+   *
+   * @param <K> the type of key this binder embodies.
+   * @deprecated for migration only; runners should reference the top level 
{@link StateBinder}
+   * and move towards {@link StateSpec} rather than {@link StateTag}.
+   */
+  @Deprecated
+  public interface StateBinder<K> {
+    <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, 
Coder<T> coder);
+
+    <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> 
elemCoder);
+
+    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT> bindCombiningValue(
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+        Coder<AccumT> accumCoder,
+        CombineFn<InputT, AccumT, OutputT> combineFn);
+
+    <InputT, AccumT, OutputT>
+    AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+        Coder<AccumT> accumCoder,
+        KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
+
+    <InputT, AccumT, OutputT>
+    AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+        Coder<AccumT> accumCoder,
+        KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
+            combineFn);
+
+    /**
+     * Bind to a watermark {@link StateSpec}.
+     *
+     * <p>This accepts the {@link OutputTimeFn} that dictates how watermark 
hold timestamps added to
+     * the returned {@link WatermarkHoldState} are to be combined.
+     */
+    <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+        StateTag<? super K, WatermarkHoldState<W>> spec,
+        OutputTimeFn<? super W> outputTimeFn);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
new file mode 100644
index 0000000..cf7c236
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateTags.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
+import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateBinder;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+
+/**
+ * Static utility methods for creating {@link StateTag} instances.
+ */
+@Experimental(Kind.STATE)
+public class StateTags {
+
+  private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry();
+
+  static {
+    STANDARD_REGISTRY.registerStandardCoders();
+  }
+
+  /** @deprecated for migration purposes only */
+  @Deprecated
+  private static <K> StateBinder<K> adaptTagBinder(final 
StateTag.StateBinder<K> binder) {
+    return new StateBinder<K>() {
+      @Override
+      public <T> ValueState<T> bindValue(
+          String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) 
{
+        return binder.bindValue(tagForSpec(id, spec), coder);
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(
+          String id, StateSpec<? super K, BagState<T>> spec, Coder<T> 
elemCoder) {
+        return binder.bindBag(tagForSpec(id, spec), elemCoder);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindCombiningValue(
+              String id,
+              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+              Coder<AccumT> accumCoder,
+              CombineFn<InputT, AccumT, OutputT> combineFn) {
+        return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, 
combineFn);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
+              String id,
+              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+              Coder<AccumT> accumCoder,
+              KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+        return binder.bindKeyedCombiningValue(tagForSpec(id, spec), 
accumCoder, combineFn);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+              String id,
+              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+              Coder<AccumT> accumCoder,
+              KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
+        return binder.bindKeyedCombiningValueWithContext(
+            tagForSpec(id, spec), accumCoder, combineFn);
+      }
+
+      @Override
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          String id,
+          StateSpec<? super K, WatermarkHoldState<W>> spec,
+          OutputTimeFn<? super W> outputTimeFn) {
+        return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn);
+      }
+    };
+  }
+
+  private enum StateKind {
+    SYSTEM('s'),
+    USER('u');
+
+    private char prefix;
+
+    StateKind(char prefix) {
+      this.prefix = prefix;
+    }
+  }
+
+  private StateTags() { }
+
+  private interface SystemStateTag<K, StateT extends State> {
+    StateTag<K, StateT> asKind(StateKind kind);
+  }
+
+  /** Create a state tag for the given id and spec. */
+  public static <K, StateT extends State> StateTag<K, StateT> tagForSpec(
+      String id, StateSpec<K, StateT> spec) {
+    return new SimpleStateTag<>(new StructuredId(id), spec);
+  }
+
+  /**
+   * Create a simple state tag for values of type {@code T}.
+   */
+  public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> 
valueCoder) {
+    return new SimpleStateTag<>(new StructuredId(id), 
StateSpecs.value(valueCoder));
+  }
+
+  /**
+   * Create a state tag for values that use a {@link CombineFn} to 
automatically merge
+   * multiple {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <InputT, AccumT, OutputT>
+    StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+    combiningValue(
+      String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> 
combineFn) {
+    return new SimpleStateTag<>(
+        new StructuredId(id), StateSpecs.combiningValue(accumCoder, 
combineFn));
+  }
+
+  /**
+   * Create a state tag for values that use a {@link KeyedCombineFn} to 
automatically merge
+   * multiple {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <K, InputT, AccumT,
+      OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      keyedCombiningValue(String id, Coder<AccumT> accumCoder,
+          KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
+    return new SimpleStateTag<>(
+        new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, 
combineFn));
+  }
+
+  /**
+   * Create a state tag for values that use a {@link 
KeyedCombineFnWithContext} to automatically
+   * merge multiple {@code InputT}s into a single {@code OutputT}.
+   */
+  public static <K, InputT, AccumT, OutputT>
+      StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      keyedCombiningValueWithContext(
+          String id,
+          Coder<AccumT> accumCoder,
+          KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
+    return new SimpleStateTag<>(
+        new StructuredId(id), 
StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn));
+  }
+
+  /**
+   * Create a state tag for values that use a {@link CombineFn} to 
automatically merge
+   * multiple {@code InputT}s into a single {@code OutputT}.
+   *
+   * <p>This determines the {@code Coder<AccumT>} from the given {@code 
Coder<InputT>}, and
+   * should only be used to initialize static values.
+   */
+  public static <InputT, AccumT, OutputT>
+      StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
+      combiningValueFromInputInternal(
+          String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, 
OutputT> combineFn) {
+    return new SimpleStateTag<>(
+        new StructuredId(id), 
StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn));
+  }
+
+  /**
+   * Create a state tag that is optimized for adding values frequently, and
+   * occasionally retrieving all the values that have been added.
+   */
+  public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> 
elemCoder) {
+    return new SimpleStateTag<>(new StructuredId(id), 
StateSpecs.bag(elemCoder));
+  }
+
+  /**
+   * Create a state tag for holding the watermark.
+   */
+  public static <W extends BoundedWindow> StateTag<Object, 
WatermarkHoldState<W>>
+      watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
+    return new SimpleStateTag<>(
+        new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn));
+  }
+
+  /**
+   * Convert an arbitrary {@link StateTag} to a system-internal tag that is 
guaranteed not to
+   * collide with any user tags.
+   */
+  public static <K, StateT extends State> StateTag<K, StateT> 
makeSystemTagInternal(
+      StateTag<K, StateT> tag) {
+    if (!(tag instanceof SystemStateTag)) {
+      throw new IllegalArgumentException("Expected subclass of SimpleStateTag, 
got " + tag);
+    }
+    // Checked above
+    @SuppressWarnings("unchecked")
+    SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag;
+    return typedTag.asKind(StateKind.SYSTEM);
+  }
+
+  public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
+      convertToBagTagInternal(
+          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> combiningTag) {
+    return new SimpleStateTag<>(
+        new StructuredId(combiningTag.getId()),
+        StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));
+  }
+
+  private static class StructuredId implements Serializable {
+    private final StateKind kind;
+    private final String rawId;
+
+    private StructuredId(String rawId) {
+      this(StateKind.USER, rawId);
+    }
+
+    private StructuredId(StateKind kind, String rawId) {
+      this.kind = kind;
+      this.rawId = rawId;
+    }
+
+    public StructuredId asKind(StateKind kind) {
+      return new StructuredId(kind, rawId);
+    }
+
+    public void appendTo(Appendable sb) throws IOException {
+      sb.append(kind.prefix).append(rawId);
+    }
+
+    public String getRawId() {
+      return rawId;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("id", rawId)
+          .add("kind", kind)
+          .toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof StructuredId)) {
+        return false;
+      }
+
+      StructuredId that = (StructuredId) obj;
+      return Objects.equals(this.kind, that.kind)
+          && Objects.equals(this.rawId, that.rawId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(kind, rawId);
+    }
+  }
+
+  /**
+   * A basic {@link StateTag} implementation that manages the structured ids.
+   */
+  private static class SimpleStateTag<K, StateT extends State>
+      implements StateTag<K, StateT>, SystemStateTag<K, StateT> {
+
+    private final StateSpec<K, StateT> spec;
+    private final StructuredId id;
+
+    public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) {
+      this.id = id;
+      this.spec = spec;
+    }
+
+    @Override
+    @Deprecated
+    public StateT bind(StateTag.StateBinder<? extends K> binder) {
+      return spec.bind(
+          this.id.getRawId(), adaptTagBinder(binder));
+    }
+
+    @Override
+    public String getId() {
+      return id.getRawId();
+    }
+
+    @Override
+    public StateSpec<K, StateT> getSpec() {
+      return spec;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("id", id)
+          .toString();
+    }
+
+    @Override
+    public void appendTo(Appendable sb) throws IOException {
+      id.appendTo(sb);
+    }
+
+    @Override
+    public StateTag<K, StateT> asKind(StateKind kind) {
+      return new SimpleStateTag<>(id.asKind(kind), spec);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (!(other instanceof SimpleStateTag)) {
+        return false;
+      }
+
+      SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other;
+      return Objects.equals(this.getId(), otherTag.getId())
+          && Objects.equals(this.getSpec(), otherTag.getSpec());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getClass(), this.getId(), this.getSpec());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
index 4a876d1..bb7e4a9 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java
@@ -29,8 +29,6 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 
 /**
  * {@link ReduceFn} implementing the default reduction behaviors of {@link 
GroupByKey}.

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
index ed852a5..0321a33 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/TestInMemoryStateInternals.java
@@ -21,7 +21,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.joda.time.Instant;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 99e15f0..d3c4bc7 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -30,8 +30,6 @@ import 
org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.joda.time.Duration;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 7cc9130..b720644 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -25,6 +25,8 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.MergingStateAccessor;
 import org.apache.beam.runners.core.StateAccessor;
 import org.apache.beam.runners.core.StateMerging;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.InstantCoder;
@@ -34,8 +36,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.format.PeriodFormat;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index b5265ba..1dd5b65 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -21,13 +21,13 @@ import java.util.Objects;
 import org.apache.beam.runners.core.MergingStateAccessor;
 import org.apache.beam.runners.core.StateAccessor;
 import org.apache.beam.runners.core.StateMerging;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 
 /**
  * {@link TriggerStateMachine}s that fire based on properties of the elements 
in the current pane.

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
index 3bdfc82..315110d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.StateAccessor;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerInfo;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo;
 import org.apache.beam.sdk.coders.Coder;
@@ -38,7 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateTag;
 import org.joda.time.Instant;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index d3adaa4..542439f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -26,11 +26,11 @@ import java.util.Collection;
 import java.util.Map;
 import org.apache.beam.runners.core.MergingStateAccessor;
 import org.apache.beam.runners.core.StateAccessor;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.BitSetCoder;
 import org.apache.beam.sdk.util.Timers;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.joda.time.Instant;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
index ca0a8e5..8ea9abc 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryStateInternalsTest.java
@@ -32,8 +32,6 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.hamcrest.Matchers;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index bea0c39..dab2bf9 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -71,7 +71,6 @@ import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
new file mode 100644
index 0000000..9a04628
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StateTagTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Max;
+import org.apache.beam.sdk.transforms.Min;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.util.CombineFnUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link StateTag}.
+ */
+@RunWith(JUnit4.class)
+public class StateTagTest {
+  @Test
+  public void testValueEquality() {
+    StateTag<?, ?> fooVarInt1 = StateTags.value("foo", VarIntCoder.of());
+    StateTag<?, ?> fooVarInt2 = StateTags.value("foo", VarIntCoder.of());
+    StateTag<?, ?> fooBigEndian = StateTags.value("foo", 
BigEndianIntegerCoder.of());
+    StateTag<?, ?> barVarInt = StateTags.value("bar", VarIntCoder.of());
+
+    assertEquals(fooVarInt1, fooVarInt2);
+    assertNotEquals(fooVarInt1, fooBigEndian);
+    assertNotEquals(fooVarInt1, barVarInt);
+  }
+
+  @Test
+  public void testBagEquality() {
+    StateTag<?, ?> fooVarInt1 = StateTags.bag("foo", VarIntCoder.of());
+    StateTag<?, ?> fooVarInt2 = StateTags.bag("foo", VarIntCoder.of());
+    StateTag<?, ?> fooBigEndian = StateTags.bag("foo", 
BigEndianIntegerCoder.of());
+    StateTag<?, ?> barVarInt = StateTags.bag("bar", VarIntCoder.of());
+
+    assertEquals(fooVarInt1, fooVarInt2);
+    assertNotEquals(fooVarInt1, fooBigEndian);
+    assertNotEquals(fooVarInt1, barVarInt);
+  }
+
+  @Test
+  public void testWatermarkBagEquality() {
+    StateTag<?, ?> foo1 = StateTags.watermarkStateInternal(
+        "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    StateTag<?, ?> foo2 = StateTags.watermarkStateInternal(
+        "foo", OutputTimeFns.outputAtEarliestInputTimestamp());
+    StateTag<?, ?> bar = StateTags.watermarkStateInternal(
+        "bar", OutputTimeFns.outputAtEarliestInputTimestamp());
+
+    StateTag<?, ?> bar2 = StateTags.watermarkStateInternal(
+        "bar", OutputTimeFns.outputAtLatestInputTimestamp());
+
+    // Same id, same fn.
+    assertEquals(foo1, foo2);
+    // Different id, same fn.
+    assertNotEquals(foo1, bar);
+    // Same id, different fn.
+    assertEquals(bar, bar2);
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Test
+  public void testCombiningValueEquality() {
+    Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers();
+    Coder<Integer> input1 = VarIntCoder.of();
+    Coder<Integer> input2 = BigEndianIntegerCoder.of();
+    Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
+
+    StateTag<?, ?> fooCoder1Max1 = 
StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+    StateTag<?, ?> fooCoder1Max2 = 
StateTags.combiningValueFromInputInternal("foo", input1, maxFn);
+    StateTag<?, ?> fooCoder1Min = 
StateTags.combiningValueFromInputInternal("foo", input1, minFn);
+
+    StateTag<?, ?> fooCoder2Max = 
StateTags.combiningValueFromInputInternal("foo", input2, maxFn);
+    StateTag<?, ?> barCoder1Max = 
StateTags.combiningValueFromInputInternal("bar", input1, maxFn);
+
+    // Same name, coder and combineFn
+    assertEquals(fooCoder1Max1, fooCoder1Max2);
+    assertEquals(
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max2));
+
+    // Different combineFn, but we treat them as equal since we only serialize 
the bits.
+    assertEquals(fooCoder1Max1, fooCoder1Min);
+    assertEquals(
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Min));
+
+    // Different input coder coder.
+    assertNotEquals(fooCoder1Max1, fooCoder2Max);
+    assertNotEquals(
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
+        StateTags.convertToBagTagInternal((StateTag) fooCoder2Max));
+
+    // These StateTags have different IDs.
+    assertNotEquals(fooCoder1Max1, barCoder1Max);
+    assertNotEquals(
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
+        StateTags.convertToBagTagInternal((StateTag) barCoder1Max));
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  @Test
+  public void testCombiningValueWithContextEquality() {
+    CoderRegistry registry = new CoderRegistry();
+    registry.registerStandardCoders();
+
+    Combine.BinaryCombineIntegerFn maxFn = Max.ofIntegers();
+    Combine.BinaryCombineIntegerFn minFn = Min.ofIntegers();
+
+    Coder<int[]> accum1 = maxFn.getAccumulatorCoder(registry, 
VarIntCoder.of());
+    Coder<int[]> accum2 = minFn.getAccumulatorCoder(registry, 
BigEndianIntegerCoder.of());
+
+    StateTag<?, ?> fooCoder1Max1 = StateTags.keyedCombiningValueWithContext(
+            "foo", accum1, 
CombineFnUtil.toFnWithContext(maxFn).<String>asKeyedFn());
+    StateTag<?, ?> fooCoder1Max2 = StateTags.keyedCombiningValueWithContext(
+        "foo", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
+    StateTag<?, ?> fooCoder1Min = StateTags.keyedCombiningValueWithContext(
+        "foo", accum1, CombineFnUtil.toFnWithContext(minFn).asKeyedFn());
+
+    StateTag<?, ?> fooCoder2Max = StateTags.keyedCombiningValueWithContext(
+        "foo", accum2, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
+    StateTag<?, ?> barCoder1Max = StateTags.keyedCombiningValueWithContext(
+        "bar", accum1, CombineFnUtil.toFnWithContext(maxFn).asKeyedFn());
+
+    // Same name, coder and combineFn
+    assertEquals(fooCoder1Max1, fooCoder1Max2);
+    assertEquals(
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max2));
+    // Different combineFn, but we treat them as equal since we only serialize 
the bits.
+    assertEquals(fooCoder1Max1, fooCoder1Min);
+    assertEquals(
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Min));
+
+    // Different input coder coder.
+    assertNotEquals(fooCoder1Max1, fooCoder2Max);
+    assertNotEquals(
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
+        StateTags.convertToBagTagInternal((StateTag) fooCoder2Max));
+
+    // These StateTags have different IDs.
+    assertNotEquals(fooCoder1Max1, barCoder1Max);
+    assertNotEquals(
+        StateTags.convertToBagTagInternal((StateTag) fooCoder1Max1),
+        StateTags.convertToBagTagInternal((StateTag) barCoder1Max));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index 4b59bf9..47c0251 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -34,6 +34,8 @@ import 
org.apache.beam.runners.core.InMemoryStateInternals.InMemoryWatermarkHold
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateTable;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTag.StateBinder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
@@ -46,8 +48,6 @@ import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTag.StateBinder;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index b3a2156..0ad40ac 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -30,6 +30,8 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -45,8 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
index 25cd252..c8eb66e 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternalsTest.java
@@ -31,6 +31,8 @@ import static org.mockito.Mockito.verify;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
 import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -45,8 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 6dd2ea4..d6f2263 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -32,6 +32,8 @@ import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
@@ -59,8 +61,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index cc5ee74..ac7d2bd 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -38,6 +38,8 @@ import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
@@ -59,8 +61,6 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateSpec;
 import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
index 5731a38..eaededb 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -40,7 +41,6 @@ import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.apache.flink.api.common.state.ListStateDescriptor;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
index 6a086a7..465dad3 100644
--- 
a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
+++ 
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java
@@ -27,6 +27,8 @@ import java.util.Arrays;
 import org.apache.beam.runners.core.StateMerging;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaceForTest;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import 
org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkStateInternals;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -40,8 +42,6 @@ import 
org.apache.beam.sdk.util.state.AccumulatorCombiningState;
 import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.CombiningState;
 import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
 import org.apache.beam.sdk.util.state.ValueState;
 import org.apache.beam.sdk.util.state.WatermarkHoldState;
 import org.apache.flink.api.common.ExecutionConfig;

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
deleted file mode 100644
index feca927..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTag.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-
-/**
- * An address and specification for a persistent state cell. This includes a 
unique identifier for
- * the location, the information necessary to encode the value, and details 
about the intended
- * access pattern.
- *
- * <p>State can be thought of as a sparse table, with each {@code StateTag} 
defining a column
- * that has cells of type {@code StateT}.
- *
- * <p>Currently, this can only be used in a step immediately following a 
{@link GroupByKey}.
- *
- * @param <K> The type of key that must be used with the state tag. 
Contravariant: methods should
- *            accept values of type {@code KeyedStateTag<? super K, StateT>}.
- * @param <StateT> The type of state being tagged.
- */
-@Experimental(Kind.STATE)
-public interface StateTag<K, StateT extends State> extends Serializable {
-
-  /** Append the UTF-8 encoding of this tag to the given {@link Appendable}. */
-  void appendTo(Appendable sb) throws IOException;
-
-  /**
-   * An identifier for the state cell that this tag references.
-   */
-  String getId();
-
-  /**
-   * The specification for the state stored in the referenced cell.
-   */
-  StateSpec<K, StateT> getSpec();
-
-  /**
-   * Bind this state tag. See {@link StateSpec#bind}.
-   *
-   * @deprecated Use the {@link StateSpec#bind} method via {@link #getSpec} 
for now.
-   */
-  @Deprecated
-  StateT bind(StateBinder<? extends K> binder);
-
-  /**
-   * Visitor for binding a {@link StateSpec} and to the associated {@link 
State}.
-   *
-   * @param <K> the type of key this binder embodies.
-   * @deprecated for migration only; runners should reference the top level 
{@link StateBinder}
-   * and move towards {@link StateSpec} rather than {@link StateTag}.
-   */
-  @Deprecated
-  public interface StateBinder<K> {
-    <T> ValueState<T> bindValue(StateTag<? super K, ValueState<T>> spec, 
Coder<T> coder);
-
-    <T> BagState<T> bindBag(StateTag<? super K, BagState<T>> spec, Coder<T> 
elemCoder);
-
-    <InputT, AccumT, OutputT> AccumulatorCombiningState<InputT, AccumT, 
OutputT> bindCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
-        Coder<AccumT> accumCoder,
-        CombineFn<InputT, AccumT, OutputT> combineFn);
-
-    <InputT, AccumT, OutputT>
-    AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
-        Coder<AccumT> accumCoder,
-        KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn);
-
-    <InputT, AccumT, OutputT>
-    AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
-        Coder<AccumT> accumCoder,
-        KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT>
-            combineFn);
-
-    /**
-     * Bind to a watermark {@link StateSpec}.
-     *
-     * <p>This accepts the {@link OutputTimeFn} that dictates how watermark 
hold timestamps added to
-     * the returned {@link WatermarkHoldState} are to be combined.
-     */
-    <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-        StateTag<? super K, WatermarkHoldState<W>> spec,
-        OutputTimeFn<? super W> outputTimeFn);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/07d93276/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
deleted file mode 100644
index acb1f08..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import com.google.common.base.MoreObjects;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Objects;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-
-/**
- * Static utility methods for creating {@link StateTag} instances.
- */
-@Experimental(Kind.STATE)
-public class StateTags {
-
-  private static final CoderRegistry STANDARD_REGISTRY = new CoderRegistry();
-
-  static {
-    STANDARD_REGISTRY.registerStandardCoders();
-  }
-
-  /** @deprecated for migration purposes only */
-  @Deprecated
-  private static <K> StateBinder<K> adaptTagBinder(final 
StateTag.StateBinder<K> binder) {
-    return new StateBinder<K>() {
-      @Override
-      public <T> ValueState<T> bindValue(
-          String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) 
{
-        return binder.bindValue(tagForSpec(id, spec), coder);
-      }
-
-      @Override
-      public <T> BagState<T> bindBag(
-          String id, StateSpec<? super K, BagState<T>> spec, Coder<T> 
elemCoder) {
-        return binder.bindBag(tagForSpec(id, spec), elemCoder);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindCombiningValue(
-              String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
-              Coder<AccumT> accumCoder,
-              CombineFn<InputT, AccumT, OutputT> combineFn) {
-        return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, 
combineFn);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
-              String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
-              Coder<AccumT> accumCoder,
-              KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-        return binder.bindKeyedCombiningValue(tagForSpec(id, spec), 
accumCoder, combineFn);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
-              String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
-              Coder<AccumT> accumCoder,
-              KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
-        return binder.bindKeyedCombiningValueWithContext(
-            tagForSpec(id, spec), accumCoder, combineFn);
-      }
-
-      @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          String id,
-          StateSpec<? super K, WatermarkHoldState<W>> spec,
-          OutputTimeFn<? super W> outputTimeFn) {
-        return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn);
-      }
-    };
-  }
-
-  private enum StateKind {
-    SYSTEM('s'),
-    USER('u');
-
-    private char prefix;
-
-    StateKind(char prefix) {
-      this.prefix = prefix;
-    }
-  }
-
-  private StateTags() { }
-
-  private interface SystemStateTag<K, StateT extends State> {
-    StateTag<K, StateT> asKind(StateKind kind);
-  }
-
-  /** Create a state tag for the given id and spec. */
-  public static <K, StateT extends State> StateTag<K, StateT> tagForSpec(
-      String id, StateSpec<K, StateT> spec) {
-    return new SimpleStateTag<>(new StructuredId(id), spec);
-  }
-
-  /**
-   * Create a simple state tag for values of type {@code T}.
-   */
-  public static <T> StateTag<Object, ValueState<T>> value(String id, Coder<T> 
valueCoder) {
-    return new SimpleStateTag<>(new StructuredId(id), 
StateSpecs.value(valueCoder));
-  }
-
-  /**
-   * Create a state tag for values that use a {@link CombineFn} to 
automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <InputT, AccumT, OutputT>
-    StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-    combiningValue(
-      String id, Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> 
combineFn) {
-    return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.combiningValue(accumCoder, 
combineFn));
-  }
-
-  /**
-   * Create a state tag for values that use a {@link KeyedCombineFn} to 
automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <K, InputT, AccumT,
-      OutputT> StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      keyedCombiningValue(String id, Coder<AccumT> accumCoder,
-          KeyedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
-    return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.keyedCombiningValue(accumCoder, 
combineFn));
-  }
-
-  /**
-   * Create a state tag for values that use a {@link 
KeyedCombineFnWithContext} to automatically
-   * merge multiple {@code InputT}s into a single {@code OutputT}.
-   */
-  public static <K, InputT, AccumT, OutputT>
-      StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      keyedCombiningValueWithContext(
-          String id,
-          Coder<AccumT> accumCoder,
-          KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> combineFn) {
-    return new SimpleStateTag<>(
-        new StructuredId(id), 
StateSpecs.keyedCombiningValueWithContext(accumCoder, combineFn));
-  }
-
-  /**
-   * Create a state tag for values that use a {@link CombineFn} to 
automatically merge
-   * multiple {@code InputT}s into a single {@code OutputT}.
-   *
-   * <p>This determines the {@code Coder<AccumT>} from the given {@code 
Coder<InputT>}, and
-   * should only be used to initialize static values.
-   */
-  public static <InputT, AccumT, OutputT>
-      StateTag<Object, AccumulatorCombiningState<InputT, AccumT, OutputT>>
-      combiningValueFromInputInternal(
-          String id, Coder<InputT> inputCoder, CombineFn<InputT, AccumT, 
OutputT> combineFn) {
-    return new SimpleStateTag<>(
-        new StructuredId(id), 
StateSpecs.combiningValueFromInputInternal(inputCoder, combineFn));
-  }
-
-  /**
-   * Create a state tag that is optimized for adding values frequently, and
-   * occasionally retrieving all the values that have been added.
-   */
-  public static <T> StateTag<Object, BagState<T>> bag(String id, Coder<T> 
elemCoder) {
-    return new SimpleStateTag<>(new StructuredId(id), 
StateSpecs.bag(elemCoder));
-  }
-
-  /**
-   * Create a state tag for holding the watermark.
-   */
-  public static <W extends BoundedWindow> StateTag<Object, 
WatermarkHoldState<W>>
-      watermarkStateInternal(String id, OutputTimeFn<? super W> outputTimeFn) {
-    return new SimpleStateTag<>(
-        new StructuredId(id), StateSpecs.watermarkStateInternal(outputTimeFn));
-  }
-
-  /**
-   * Convert an arbitrary {@link StateTag} to a system-internal tag that is 
guaranteed not to
-   * collide with any user tags.
-   */
-  public static <K, StateT extends State> StateTag<K, StateT> 
makeSystemTagInternal(
-      StateTag<K, StateT> tag) {
-    if (!(tag instanceof SystemStateTag)) {
-      throw new IllegalArgumentException("Expected subclass of SimpleStateTag, 
got " + tag);
-    }
-    // Checked above
-    @SuppressWarnings("unchecked")
-    SystemStateTag<K, StateT> typedTag = (SystemStateTag<K, StateT>) tag;
-    return typedTag.asKind(StateKind.SYSTEM);
-  }
-
-  public static <K, InputT, AccumT, OutputT> StateTag<Object, BagState<AccumT>>
-      convertToBagTagInternal(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> combiningTag) {
-    return new SimpleStateTag<>(
-        new StructuredId(combiningTag.getId()),
-        StateSpecs.convertToBagSpecInternal(combiningTag.getSpec()));
-  }
-
-  private static class StructuredId implements Serializable {
-    private final StateKind kind;
-    private final String rawId;
-
-    private StructuredId(String rawId) {
-      this(StateKind.USER, rawId);
-    }
-
-    private StructuredId(StateKind kind, String rawId) {
-      this.kind = kind;
-      this.rawId = rawId;
-    }
-
-    public StructuredId asKind(StateKind kind) {
-      return new StructuredId(kind, rawId);
-    }
-
-    public void appendTo(Appendable sb) throws IOException {
-      sb.append(kind.prefix).append(rawId);
-    }
-
-    public String getRawId() {
-      return rawId;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("id", rawId)
-          .add("kind", kind)
-          .toString();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-
-      if (!(obj instanceof StructuredId)) {
-        return false;
-      }
-
-      StructuredId that = (StructuredId) obj;
-      return Objects.equals(this.kind, that.kind)
-          && Objects.equals(this.rawId, that.rawId);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(kind, rawId);
-    }
-  }
-
-  /**
-   * A basic {@link StateTag} implementation that manages the structured ids.
-   */
-  private static class SimpleStateTag<K, StateT extends State>
-      implements StateTag<K, StateT>, SystemStateTag<K, StateT> {
-
-    private final StateSpec<K, StateT> spec;
-    private final StructuredId id;
-
-    public SimpleStateTag(StructuredId id, StateSpec<K, StateT> spec) {
-      this.id = id;
-      this.spec = spec;
-    }
-
-    @Override
-    @Deprecated
-    public StateT bind(StateTag.StateBinder<? extends K> binder) {
-      return spec.bind(
-          this.id.getRawId(), adaptTagBinder(binder));
-    }
-
-    @Override
-    public String getId() {
-      return id.getRawId();
-    }
-
-    @Override
-    public StateSpec<K, StateT> getSpec() {
-      return spec;
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("id", id)
-          .toString();
-    }
-
-    @Override
-    public void appendTo(Appendable sb) throws IOException {
-      id.appendTo(sb);
-    }
-
-    @Override
-    public StateTag<K, StateT> asKind(StateKind kind) {
-      return new SimpleStateTag<>(id.asKind(kind), spec);
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (!(other instanceof SimpleStateTag)) {
-        return false;
-      }
-
-      SimpleStateTag<?, ?> otherTag = (SimpleStateTag<?, ?>) other;
-      return Objects.equals(this.getId(), otherTag.getId())
-          && Objects.equals(this.getSpec(), otherTag.getSpec());
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass(), this.getId(), this.getSpec());
-    }
-  }
-}

Reply via email to