Move StateInternalsFactory to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e6c1310 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e6c1310 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e6c1310 Branch: refs/heads/master Commit: 2e6c131027caea3e4df5545ff1b65acc2a7f53a0 Parents: 1d7c6b0 Author: Kenneth Knowles <k...@google.com> Authored: Thu Jan 26 21:13:14 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Feb 6 09:26:06 2017 -0800 ---------------------------------------------------------------------- .../apex/translation/TranslationContext.java | 2 +- .../operators/ApexGroupByKeyOperator.java | 2 +- .../operators/ApexParDoOperator.java | 2 +- .../translation/utils/ApexStateInternals.java | 4 +-- .../GroupAlsoByWindowViaOutputBufferDoFn.java | 1 - .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 1 - .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 1 - .../beam/runners/core/SplittableParDo.java | 1 - .../runners/core/StateInternalsFactory.java | 36 ++++++++++++++++++++ ...roupAlsoByWindowViaOutputBufferDoFnTest.java | 1 - .../core/GroupAlsoByWindowsProperties.java | 1 - .../beam/runners/core/SplittableParDoTest.java | 1 - ...littableProcessElementsEvaluatorFactory.java | 2 +- .../wrappers/streaming/WindowDoFnOperator.java | 3 +- .../translation/SparkGroupAlsoByWindowFn.java | 2 +- .../spark/translation/TranslationUtils.java | 2 +- .../sdk/util/state/StateInternalsFactory.java | 35 ------------------- 17 files changed, 44 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 8c30bc6..acd8ab1 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -33,11 +33,11 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translation.utils.ApexStateInternals; import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec; +import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index c5da368..6322796 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -45,6 +45,7 @@ import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.OldDoFn; +import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.WindowingInternals; import org.apache.beam.sdk.coders.Coder; @@ -62,7 +63,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 808001e..13db40a 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.transforms.Aggregator; @@ -57,7 +58,6 @@ import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java index 17a4f81..334353d 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -22,14 +22,13 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.collect.HashBasedTable; import com.google.common.collect.Table; - import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; - +import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; import org.apache.beam.sdk.coders.InstantCoder; @@ -47,7 +46,6 @@ import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTag.StateBinder; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java index 85df98c..57981ad 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index d0387cf..9ccefda 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -26,7 +26,6 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; /** http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index cff6e00..3786b48 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.TimerInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 78acb19..e414430 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -53,7 +53,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java new file mode 100644 index 0000000..7ba3ab7 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.util.state.StateInternals; + +/** + * A factory for providing {@link StateInternals} for a particular key. + * + * <p>Because it will generally be embedded in a {@link org.apache.beam.sdk.transforms.DoFn DoFn}, + * albeit at execution time, it is marked {@link Serializable}. + */ +@Experimental(Kind.STATE) +public interface StateInternalsFactory<K> { + + /** Returns {@link StateInternals} for the provided key. */ + StateInternals<K> stateInternalsForKey(K key); +} http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java index 681c316..cb8d494 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFnTest.java @@ -22,7 +22,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index 98063df..8f5c1ef 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -51,7 +51,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 4bddd51..b408d37 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -55,7 +55,6 @@ import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.TimerInternalsFactory; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 64593cd..2122df8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -33,7 +34,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.TimerInternalsFactory; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 64eb472..04b5f47 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -38,11 +38,11 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ScheduledFuture; import javax.annotation.Nullable; - import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.sdk.coders.Coder; @@ -54,7 +54,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.TimerInternalsFactory; import org.apache.beam.sdk.values.KV; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java index 34eea65..6d7529b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; +import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; @@ -40,7 +41,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index 8a9317e..638410d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.InMemoryStateInternals; +import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.transforms.DoFn; @@ -37,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/2e6c1310/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java deleted file mode 100644 index 019b69d..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternalsFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import java.io.Serializable; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * A factory for providing {@link StateInternals} for a particular key. - * - * <p>Because it will generally be embedded in a {@link org.apache.beam.sdk.transforms.DoFn DoFn}, - * albeit at execution time, it is marked {@link Serializable}. - */ -@Experimental(Kind.STATE) -public interface StateInternalsFactory<K> { - - /** Returns {@link StateInternals} for the provided key. */ - StateInternals<K> stateInternalsForKey(K key); -}