Move StateTag adapter code to StateTags

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c42a19b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c42a19b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c42a19b1

Branch: refs/heads/master
Commit: c42a19b16fc70b48ecdd1cdd23148d16bb41fd7b
Parents: d39cec4
Author: Kenneth Knowles <k...@google.com>
Authored: Fri Feb 3 19:48:38 2017 -0800
Committer: Kenneth Knowles <k...@google.com>
Committed: Tue Feb 7 11:42:55 2017 -0800

----------------------------------------------------------------------
 .../apache/beam/sdk/util/state/StateSpecs.java  | 59 --------------------
 .../apache/beam/sdk/util/state/StateTags.java   | 59 +++++++++++++++++++-
 2 files changed, 58 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c42a19b1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index b9c22cf..08c3a12 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -387,63 +387,4 @@ public class StateSpecs {
     }
   }
 
-  /**
-   * @deprecated for migration purposes only
-   */
-  @Deprecated
-  public static <K> StateBinder<K> adaptTagBinder(final 
StateTag.StateBinder<K> binder) {
-    return new StateBinder<K>() {
-      @Override
-      public <T> ValueState<T> bindValue(
-          String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) 
{
-        return binder.bindValue(StateTags.tagForSpec(id, spec), coder);
-      }
-
-      @Override
-      public <T> BagState<T> bindBag(
-          String id, StateSpec<? super K, BagState<T>> spec, Coder<T> 
elemCoder) {
-        return binder.bindBag(StateTags.tagForSpec(id, spec), elemCoder);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindCombiningValue(
-              String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
-              Coder<AccumT> accumCoder,
-              CombineFn<InputT, AccumT, OutputT> combineFn) {
-        return binder.bindCombiningValue(StateTags.tagForSpec(id, spec), 
accumCoder, combineFn);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
-              String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
-              Coder<AccumT> accumCoder,
-              KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
-        return binder.bindKeyedCombiningValue(
-            StateTags.tagForSpec(id, spec), accumCoder, combineFn);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
-              String id,
-              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
-              Coder<AccumT> accumCoder,
-              KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
-        return binder.bindKeyedCombiningValueWithContext(
-            StateTags.tagForSpec(id, spec), accumCoder, combineFn);
-      }
-
-      @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          String id,
-          StateSpec<? super K, WatermarkHoldState<W>> spec,
-          OutputTimeFn<? super W> outputTimeFn) {
-        return binder.bindWatermark(StateTags.tagForSpec(id, spec), 
outputTimeFn);
-      }
-    };
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c42a19b1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
index 4fe3a4f..acb1f08 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java
@@ -43,6 +43,63 @@ public class StateTags {
     STANDARD_REGISTRY.registerStandardCoders();
   }
 
+  /** @deprecated for migration purposes only */
+  @Deprecated
+  private static <K> StateBinder<K> adaptTagBinder(final 
StateTag.StateBinder<K> binder) {
+    return new StateBinder<K>() {
+      @Override
+      public <T> ValueState<T> bindValue(
+          String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) 
{
+        return binder.bindValue(tagForSpec(id, spec), coder);
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(
+          String id, StateSpec<? super K, BagState<T>> spec, Coder<T> 
elemCoder) {
+        return binder.bindBag(tagForSpec(id, spec), elemCoder);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindCombiningValue(
+              String id,
+              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+              Coder<AccumT> accumCoder,
+              CombineFn<InputT, AccumT, OutputT> combineFn) {
+        return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, 
combineFn);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
+              String id,
+              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+              Coder<AccumT> accumCoder,
+              KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) {
+        return binder.bindKeyedCombiningValue(tagForSpec(id, spec), 
accumCoder, combineFn);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+              String id,
+              StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> spec,
+              Coder<AccumT> accumCoder,
+              KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> 
combineFn) {
+        return binder.bindKeyedCombiningValueWithContext(
+            tagForSpec(id, spec), accumCoder, combineFn);
+      }
+
+      @Override
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          String id,
+          StateSpec<? super K, WatermarkHoldState<W>> spec,
+          OutputTimeFn<? super W> outputTimeFn) {
+        return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn);
+      }
+    };
+  }
+
   private enum StateKind {
     SYSTEM('s'),
     USER('u');
@@ -238,7 +295,7 @@ public class StateTags {
     @Deprecated
     public StateT bind(StateTag.StateBinder<? extends K> binder) {
       return spec.bind(
-          this.id.getRawId(), StateSpecs.adaptTagBinder(binder));
+          this.id.getRawId(), adaptTagBinder(binder));
     }
 
     @Override

Reply via email to