Move StateInternals to runners/core-java
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/144b1df8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/144b1df8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/144b1df8 Branch: refs/heads/master Commit: 144b1df88176f32aec4991423eb225b844ff16c2 Parents: 4e4391c Author: Kenneth Knowles <k...@google.com> Authored: Thu Jan 26 21:22:38 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Feb 6 09:26:56 2017 -0800 ---------------------------------------------------------------------- .../operators/ApexGroupByKeyOperator.java | 2 +- .../operators/ApexParDoOperator.java | 2 +- .../translation/utils/ApexStateInternals.java | 2 +- .../apex/translation/utils/NoOpStepContext.java | 2 +- .../beam/runners/core/BaseExecutionContext.java | 1 - .../beam/runners/core/ExecutionContext.java | 1 - .../GroupAlsoByWindowViaOutputBufferDoFn.java | 1 - .../core/GroupAlsoByWindowViaWindowSetDoFn.java | 1 - .../GroupAlsoByWindowViaWindowSetNewDoFn.java | 1 - .../runners/core/InMemoryStateInternals.java | 1 - .../runners/core/MergingActiveWindowSet.java | 1 - .../runners/core/ReduceFnContextFactory.java | 1 - .../beam/runners/core/ReduceFnRunner.java | 1 - .../beam/runners/core/SideInputHandler.java | 1 - .../beam/runners/core/SimpleOldDoFnRunner.java | 1 - .../beam/runners/core/SplittableParDo.java | 1 - .../beam/runners/core/StateInternals.java | 61 ++++++++++++++++++++ .../runners/core/StateInternalsFactory.java | 1 - .../beam/runners/core/WindowingInternals.java | 1 - .../TriggerStateMachineContextFactory.java | 2 +- .../core/GroupAlsoByWindowsProperties.java | 1 - .../core/MergingActiveWindowSetTest.java | 1 - .../beam/runners/core/SplittableParDoTest.java | 1 - .../triggers/TriggerStateMachineTester.java | 2 +- .../CopyOnAccessInMemoryStateInternals.java | 2 +- ...littableProcessElementsEvaluatorFactory.java | 2 +- .../StatefulParDoEvaluatorFactoryTest.java | 2 +- .../functions/FlinkProcessContextBase.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../wrappers/streaming/FlinkStateInternals.java | 2 +- .../wrappers/streaming/WindowDoFnOperator.java | 2 +- .../translation/SparkGroupAlsoByWindowFn.java | 4 +- .../spark/translation/SparkProcessContext.java | 2 +- .../spark/translation/TranslationUtils.java | 2 +- .../beam/sdk/util/state/StateInternals.java | 57 ------------------ .../beam/fn/harness/fake/FakeStepContext.java | 2 +- 36 files changed, 78 insertions(+), 93 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 7891b34..274e807 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -45,6 +45,7 @@ import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.OldDoFn; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; @@ -62,7 +63,6 @@ import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 13db40a..7f2512a 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -46,6 +46,7 @@ import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.ListCoder; @@ -57,7 +58,6 @@ import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java index 334353d..c9f2e39 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java @@ -28,6 +28,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -45,7 +46,6 @@ import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTag.StateBinder; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java index feae46e..ad4de97 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/NoOpStepContext.java @@ -20,11 +20,11 @@ package org.apache.beam.runners.apex.translation.utils; import java.io.IOException; import java.io.Serializable; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; /** http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java index eec913c..0f23fea 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/BaseExecutionContext.java @@ -25,7 +25,6 @@ import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; /** http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java index f6bcc3d..40c0798 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ExecutionContext.java @@ -22,7 +22,6 @@ import java.util.Collection; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; /** http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java index 4cde7da..5b2e051 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java @@ -24,7 +24,6 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.joda.time.Instant; /** http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index 49b010e..ac0b1ab 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.KV; /** http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java index b4310ec..7b65a0b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java index a867b43..cf03750 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryStateInternals.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTag.StateBinder; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java index 720377a..e3480fa 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/MergingActiveWindowSet.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index 6f8715e..cc0f560 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -38,7 +38,6 @@ import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateAccessor; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 50b1192..091adad 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -57,7 +57,6 @@ import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index ae7f759..6a7d03d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -32,7 +32,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTags; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 6b2fbb2..c21ed77 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -42,7 +42,6 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 30ee7cb..39d7c56 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -51,7 +51,6 @@ import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java new file mode 100644 index 0000000..0b3bd62 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternals.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateContext; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateTag; + +/** + * {@code StateInternals} describes the functionality a runner needs to provide for the + * State API to be supported. + * + * <p>The SDK will only use this after elements have been partitioned by key. For instance, after a + * {@link GroupByKey} operation. The runner implementation must ensure that any writes using + * {@link StateInternals} are implicitly scoped to the key being processed and the specific step + * accessing state. + * + * <p>The runner implementation must also ensure that any writes to the associated state objects + * are persisted together with the completion status of the processing that produced these + * writes. + * + * <p>This is a low-level API intended for use by the Beam SDK. It should not be + * used directly, and is highly likely to change. + */ +@Experimental(Kind.STATE) +public interface StateInternals<K> { + + /** The key for this {@link StateInternals}. */ + K getKey(); + + /** + * Return the state associated with {@code address} in the specified {@code namespace}. + */ + <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address); + + /** + * Return the state associated with {@code address} in the specified {@code namespace} + * with the {@link StateContext}. + */ + <T extends State> T state( + StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c); +} http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java index 7ba3ab7..ea7d742 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StateInternalsFactory.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core; import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.util.state.StateInternals; /** * A factory for providing {@link StateInternals} for a particular key. http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java index c033765..8dc0bfc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WindowingInternals.java @@ -21,7 +21,6 @@ import java.util.Collection; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java index e3df4ee..2ac9065 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineContextFactory.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.runners.core.ActiveWindowSet; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.triggers.TriggerStateMachine.MergingTriggerInfo; import org.apache.beam.runners.core.triggers.TriggerStateMachine.TriggerInfo; import org.apache.beam.sdk.coders.Coder; @@ -35,7 +36,6 @@ import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.state.MergingStateAccessor; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java index 423a674..6c7c4e0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -49,7 +49,6 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java index 6cccdae..95d6977 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/MergingActiveWindowSetTest.java @@ -31,7 +31,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.util.state.StateInternals; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.After; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 4de07d1..689d2e2 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -53,7 +53,6 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index 5148ae6..cd6d394 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.MergingActiveWindowSet; import org.apache.beam.runners.core.NonMergingActiveWindowSet; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TestInMemoryStateInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -48,7 +49,6 @@ import org.apache.beam.sdk.util.Timers; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java index afb38c2..e809823 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CopyOnAccessInMemoryStateInternals.java @@ -31,6 +31,7 @@ import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryState; import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryStateBinder; import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryValue; import org.apache.beam.runners.core.InMemoryStateInternals.InMemoryWatermarkHold; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateTable; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine.CombineFn; @@ -44,7 +45,6 @@ import org.apache.beam.sdk.util.state.BagState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.StateTag.StateBinder; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 85a8d92..7ffcab4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternalsFactory; @@ -34,7 +35,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java index 94ffbd3..62fc102 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactoryTest.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.List; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.runners.direct.ParDoMultiOverrideFactory.StatefulParDo; @@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateSpec; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index a656d4a..9b83eb4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -25,6 +25,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; import org.apache.beam.runners.core.OldDoFn; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.WindowingInternals; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; @@ -35,7 +36,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.api.common.functions.RuntimeContext; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 9fd83c6..c66f026 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -36,6 +36,7 @@ import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; @@ -54,7 +55,6 @@ import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.api.common.ExecutionConfig; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java index 27e3dc6..c992d40 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -38,7 +39,6 @@ import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.State; import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.ValueState; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index aa3429e..f1eee84 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -42,6 +42,7 @@ import org.apache.beam.runners.core.ExecutionContext; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; @@ -54,7 +55,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java index 7452a11..b651fab 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.core.TimerInternals; @@ -40,7 +41,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -48,8 +48,6 @@ import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.FlatMapFunction; import org.joda.time.Instant; - - /** * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn} * for the Spark runner. http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java index 486bc16..4f8a1a5 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java @@ -25,13 +25,13 @@ import java.util.Iterator; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java index 638410d..4dcc705 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.beam.runners.core.InMemoryStateInternals; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.spark.SparkRunner; import org.apache.beam.runners.spark.util.SideInputBroadcast; @@ -37,7 +38,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternals.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternals.java deleted file mode 100644 index 0385c0b..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateInternals.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; -import org.apache.beam.sdk.transforms.GroupByKey; - -/** - * {@code StateInternals} describes the functionality a runner needs to provide for the - * State API to be supported. - * - * <p>The SDK will only use this after elements have been partitioned by key. For instance, after a - * {@link GroupByKey} operation. The runner implementation must ensure that any writes using - * {@link StateInternals} are implicitly scoped to the key being processed and the specific step - * accessing state. - * - * <p>The runner implementation must also ensure that any writes to the associated state objects - * are persisted together with the completion status of the processing that produced these - * writes. - * - * <p>This is a low-level API intended for use by the Beam SDK. It should not be - * used directly, and is highly likely to change. - */ -@Experimental(Kind.STATE) -public interface StateInternals<K> { - - /** The key for this {@link StateInternals}. */ - K getKey(); - - /** - * Return the state associated with {@code address} in the specified {@code namespace}. - */ - <T extends State> T state(StateNamespace namespace, StateTag<? super K, T> address); - - /** - * Return the state associated with {@code address} in the specified {@code namespace} - * with the {@link StateContext}. - */ - <T extends State> T state( - StateNamespace namespace, StateTag<? super K, T> address, StateContext<?> c); -} http://git-wip-us.apache.org/repos/asf/beam/blob/144b1df8/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java index c2b6223..6403e96 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/fake/FakeStepContext.java @@ -20,11 +20,11 @@ package org.apache.beam.fn.harness.fake; import java.io.IOException; import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.TupleTag; /**