Move StateTag adapter code to StateTags
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c42a19b1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c42a19b1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c42a19b1 Branch: refs/heads/master Commit: c42a19b16fc70b48ecdd1cdd23148d16bb41fd7b Parents: d39cec4 Author: Kenneth Knowles <k...@google.com> Authored: Fri Feb 3 19:48:38 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Tue Feb 7 11:42:55 2017 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/util/state/StateSpecs.java | 59 -------------------- .../apache/beam/sdk/util/state/StateTags.java | 59 +++++++++++++++++++- 2 files changed, 58 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c42a19b1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java index b9c22cf..08c3a12 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java @@ -387,63 +387,4 @@ public class StateSpecs { } } - /** - * @deprecated for migration purposes only - */ - @Deprecated - public static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) { - return new StateBinder<K>() { - @Override - public <T> ValueState<T> bindValue( - String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) { - return binder.bindValue(StateTags.tagForSpec(id, spec), coder); - } - - @Override - public <T> BagState<T> bindBag( - String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) { - return binder.bindBag(StateTags.tagForSpec(id, spec), elemCoder); - } - - @Override - public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( - String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - CombineFn<InputT, AccumT, OutputT> combineFn) { - return binder.bindCombiningValue(StateTags.tagForSpec(id, spec), accumCoder, combineFn); - } - - @Override - public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( - String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { - return binder.bindKeyedCombiningValue( - StateTags.tagForSpec(id, spec), accumCoder, combineFn); - } - - @Override - public <InputT, AccumT, OutputT> - AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( - String id, - StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, - Coder<AccumT> accumCoder, - KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { - return binder.bindKeyedCombiningValueWithContext( - StateTags.tagForSpec(id, spec), accumCoder, combineFn); - } - - @Override - public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( - String id, - StateSpec<? super K, WatermarkHoldState<W>> spec, - OutputTimeFn<? super W> outputTimeFn) { - return binder.bindWatermark(StateTags.tagForSpec(id, spec), outputTimeFn); - } - }; - } } http://git-wip-us.apache.org/repos/asf/beam/blob/c42a19b1/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java index 4fe3a4f..acb1f08 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateTags.java @@ -43,6 +43,63 @@ public class StateTags { STANDARD_REGISTRY.registerStandardCoders(); } + /** @deprecated for migration purposes only */ + @Deprecated + private static <K> StateBinder<K> adaptTagBinder(final StateTag.StateBinder<K> binder) { + return new StateBinder<K>() { + @Override + public <T> ValueState<T> bindValue( + String id, StateSpec<? super K, ValueState<T>> spec, Coder<T> coder) { + return binder.bindValue(tagForSpec(id, spec), coder); + } + + @Override + public <T> BagState<T> bindBag( + String id, StateSpec<? super K, BagState<T>> spec, Coder<T> elemCoder) { + return binder.bindBag(tagForSpec(id, spec), elemCoder); + } + + @Override + public <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindCombiningValue( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineFn<InputT, AccumT, OutputT> combineFn) { + return binder.bindCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); + } + + @Override + public <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn) { + return binder.bindKeyedCombiningValue(tagForSpec(id, spec), accumCoder, combineFn); + } + + @Override + public <InputT, AccumT, OutputT> + AccumulatorCombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValueWithContext( + String id, + StateSpec<? super K, AccumulatorCombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + KeyedCombineFnWithContext<? super K, InputT, AccumT, OutputT> combineFn) { + return binder.bindKeyedCombiningValueWithContext( + tagForSpec(id, spec), accumCoder, combineFn); + } + + @Override + public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark( + String id, + StateSpec<? super K, WatermarkHoldState<W>> spec, + OutputTimeFn<? super W> outputTimeFn) { + return binder.bindWatermark(tagForSpec(id, spec), outputTimeFn); + } + }; + } + private enum StateKind { SYSTEM('s'), USER('u'); @@ -238,7 +295,7 @@ public class StateTags { @Deprecated public StateT bind(StateTag.StateBinder<? extends K> binder) { return spec.bind( - this.id.getRawId(), StateSpecs.adaptTagBinder(binder)); + this.id.getRawId(), adaptTagBinder(binder)); } @Override