Move StateInternals to runners/core-java

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/144b1df8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/144b1df8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/144b1df8

Branch: refs/heads/master
Commit: 144b1df88176f32aec4991423eb225b844ff16c2
Parents: 4e4391c
Author: Kenneth Knowles <k...@google.com>
Authored: Thu Jan 26 21:22:38 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Mon Feb 6 09:26:56 2017 -0800

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       |  2 +-
 .../operators/ApexParDoOperator.java            |  2 +-
 .../translation/utils/ApexStateInternals.java   |  2 +-
 .../apex/translation/utils/NoOpStepContext.java |  2 +-
 .../beam/runners/core/BaseExecutionContext.java |  1 -
 .../beam/runners/core/ExecutionContext.java     |  1 -
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |  1 -
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |  1 -
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |  1 -
 .../runners/core/InMemoryStateInternals.java    |  1 -
 .../runners/core/MergingActiveWindowSet.java    |  1 -
 .../runners/core/ReduceFnContextFactory.java    |  1 -
 .../beam/runners/core/ReduceFnRunner.java       |  1 -
 .../beam/runners/core/SideInputHandler.java     |  1 -
 .../beam/runners/core/SimpleOldDoFnRunner.java  |  1 -
 .../beam/runners/core/SplittableParDo.java      |  1 -
 .../beam/runners/core/StateInternals.java       | 61 ++++++++++++++++++++
 .../runners/core/StateInternalsFactory.java     |  1 -
 .../beam/runners/core/WindowingInternals.java   |  1 -
 .../TriggerStateMachineContextFactory.java      |  2 +-
 .../core/GroupAlsoByWindowsProperties.java      |  1 -
 .../core/MergingActiveWindowSetTest.java        |  1 -
 .../beam/runners/core/SplittableParDoTest.java  |  1 -
 .../triggers/TriggerStateMachineTester.java     |  2 +-
 .../CopyOnAccessInMemoryStateInternals.java     |  2 +-
 ...littableProcessElementsEvaluatorFactory.java |  2 +-
 .../StatefulParDoEvaluatorFactoryTest.java      |  2 +-
 .../functions/FlinkProcessContextBase.java      |  2 +-
 .../wrappers/streaming/DoFnOperator.java        |  2 +-
 .../wrappers/streaming/FlinkStateInternals.java |  2 +-
 .../wrappers/streaming/WindowDoFnOperator.java  |  2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   |  4 +-
 .../spark/translation/SparkProcessContext.java  |  2 +-
 .../spark/translation/TranslationUtils.java     |  2 +-
 .../beam/sdk/util/state/StateInternals.java     | 57 ------------------
 .../beam/fn/harness/fake/FakeStepContext.java   |  2 +-
 36 files changed, 78 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 7891b34..274e807 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -45,6 +45,7 @@ import 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
@@ -62,7 +63,6 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 13db40a..7f2512a 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -46,6 +46,7 @@ import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.OldDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.ListCoder;
@@ -57,7 +58,6 @@ import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index 334353d..c9f2e39 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.Context;
@@ -45,7 +46,6 @@ import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTag.StateBinder;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
index feae46e..ad4de97 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java
@@ -20,11 +20,11 @@ package org.apache.beam.runners.apex.translation.utils;
 import java.io.IOException;
 import java.io.Serializable;
 import org.apache.beam.runners.core.ExecutionContext;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
index eec913c..0f23fea 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
index f6bcc3d..40c0798 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index 4cde7da..5b2e051 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -24,7 +24,6 @@ import 
org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.joda.time.Instant;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 49b010e..ac0b1ab 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.KV;
 
 /**

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index b4310ec..7b65a0b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
index a867b43..cf03750 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java
@@ -37,7 +37,6 @@ import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTag.StateBinder;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
index 720377a..e3480fa 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.coders.MapCoder;
 import org.apache.beam.sdk.coders.SetCoder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 6f8715e..cc0f560 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -38,7 +38,6 @@ import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateAccessor;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 50b1192..091adad 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index ae7f759..6a7d03d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTags;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index 6b2fbb2..c21ed77 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -42,7 +42,6 @@ import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 30ee7cb..39d7c56 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -51,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
new file mode 100644
index 0000000..0b3bd62
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateTag;
+
+/**
+ * {@code StateInternals} describes the functionality a runner needs to 
provide for the
+ * State API to be supported.
+ *
+ * <p>The SDK will only use this after elements have been partitioned by key. 
For instance, after a
+ * {@link GroupByKey} operation. The runner implementation must ensure that 
any writes using
+ * {@link StateInternals} are implicitly scoped to the key being processed and 
the specific step
+ * accessing state.
+ *
+ * <p>The runner implementation must also ensure that any writes to the 
associated state objects
+ * are persisted together with the completion status of the processing that 
produced these
+ * writes.
+ *
+ * <p>This is a low-level API intended for use by the Beam SDK. It should not 
be
+ * used directly, and is highly likely to change.
+ */
+@Experimental(Kind.STATE)
+public interface StateInternals<K> {
+
+  /** The key for this {@link StateInternals}. */
+  K getKey();
+
+  /**
+   * Return the state associated with {@code address} in the specified {@code 
namespace}.
+   */
+  <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> 
address);
+
+  /**
+   * Return the state associated with {@code address} in the specified {@code 
namespace}
+   * with the {@link StateContext}.
+   */
+  <T extends State> T state(
+      StateNamespace namespace, StateTag<? super K, T> address, 
StateContext<?> c);
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
index 7ba3ab7..ea7d742 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core;
 import java.io.Serializable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.util.state.StateInternals;
 
 /**
  * A factory for providing {@link StateInternals} for a particular key.

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
index c033765..8dc0bfc 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
index e3df4ee..2ac9065 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.ActiveWindowSet;
+import org.apache.beam.runners.core.StateInternals;
 import 
org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerInfo;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo;
 import org.apache.beam.sdk.coders.Coder;
@@ -35,7 +36,6 @@ import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.state.MergingStateAccessor;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
index 423a674..6c7c4e0 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java
@@ -49,7 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
index 6cccdae..95d6977 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java
@@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 4de07d1..689d2e2 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -53,7 +53,6 @@ import 
org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
index 5148ae6..cd6d394 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
@@ -37,6 +37,7 @@ import 
org.apache.beam.runners.core.ActiveWindowSet.MergeCallback;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.MergingActiveWindowSet;
 import org.apache.beam.runners.core.NonMergingActiveWindowSet;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TestInMemoryStateInternals;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -48,7 +49,6 @@ import org.apache.beam.sdk.util.Timers;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import 
org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
index afb38c2..e809823 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java
@@ -31,6 +31,7 @@ import 
org.apache.beam.runners.core.InMemoryStateInternals.InMemoryState;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryStateBinder;
 import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryValue;
 import 
org.apache.beam.runners.core.InMemoryStateInternals.InMemoryWatermarkHold;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateTable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
@@ -44,7 +45,6 @@ import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.StateTag.StateBinder;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 85a8d92..7ffcab4 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.core.KeyedWorkItem;
 import 
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.TimerInternalsFactory;
@@ -34,7 +35,6 @@ import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
index 94ffbd3..62fc102 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java
@@ -35,6 +35,7 @@ import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo;
@@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
 import org.apache.beam.sdk.util.state.StateSpec;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
index a656d4a..9b83eb4 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.beam.runners.core.OldDoFn;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.WindowingInternals;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
@@ -35,7 +36,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.functions.RuntimeContext;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 9fd83c6..c66f026 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -36,6 +36,7 @@ import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
@@ -54,7 +55,6 @@ import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.flink.api.common.ExecutionConfig;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
index 27e3dc6..c992d40 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -38,7 +39,6 @@ import org.apache.beam.sdk.util.state.ReadableState;
 import org.apache.beam.sdk.util.state.State;
 import org.apache.beam.sdk.util.state.StateContext;
 import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateTag;
 import org.apache.beam.sdk.util.state.ValueState;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index aa3429e..f1eee84 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -42,6 +42,7 @@ import org.apache.beam.runners.core.ExecutionContext;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
@@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 7452a11..b651fab 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -27,6 +27,7 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
 import org.apache.beam.runners.core.InMemoryTimerInternals;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
@@ -40,7 +41,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -48,8 +48,6 @@ import org.apache.spark.Accumulator;
 import org.apache.spark.api.java.function.FlatMapFunction;
 import org.joda.time.Instant;
 
-
-
 /**
  * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn}
  * for the Spark runner.

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 486bc16..4f8a1a5 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -25,13 +25,13 @@ import java.util.Iterator;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 638410d..4dcc705 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.spark.SparkRunner;
 import org.apache.beam.runners.spark.util.SideInputBroadcast;
@@ -37,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternals.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternals.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternals.java
deleted file mode 100644
index 0385c0b..0000000
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternals.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.transforms.GroupByKey;
-
-/**
- * {@code StateInternals} describes the functionality a runner needs to 
provide for the
- * State API to be supported.
- *
- * <p>The SDK will only use this after elements have been partitioned by key. 
For instance, after a
- * {@link GroupByKey} operation. The runner implementation must ensure that 
any writes using
- * {@link StateInternals} are implicitly scoped to the key being processed and 
the specific step
- * accessing state.
- *
- * <p>The runner implementation must also ensure that any writes to the 
associated state objects
- * are persisted together with the completion status of the processing that 
produced these
- * writes.
- *
- * <p>This is a low-level API intended for use by the Beam SDK. It should not 
be
- * used directly, and is highly likely to change.
- */
-@Experimental(Kind.STATE)
-public interface StateInternals<K> {
-
-  /** The key for this {@link StateInternals}. */
-  K getKey();
-
-  /**
-   * Return the state associated with {@code address} in the specified {@code 
namespace}.
-   */
-  <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> 
address);
-
-  /**
-   * Return the state associated with {@code address} in the specified {@code 
namespace}
-   * with the {@link StateContext}.
-   */
-  <T extends State> T state(
-      StateNamespace namespace, StateTag<? super K, T> address, 
StateContext<?> c);
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
index c2b6223..6403e96 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java
@@ -20,11 +20,11 @@ package org.apache.beam.fn.harness.fake;
 
 import java.io.IOException;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.values.TupleTag;
 
 /**

Reply via email to