Repository: beam Updated Branches: refs/heads/master 2f4dd8dfa -> 3042d761a
Add @Internal and @Experimental to state package Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ac01ec7a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ac01ec7a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ac01ec7a Branch: refs/heads/master Commit: ac01ec7afb43ae0bc4198234d1de16a830f95b10 Parents: 78e0acc Author: Kenneth Knowles <k...@google.com> Authored: Tue May 2 10:41:01 2017 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Thu May 4 16:06:34 2017 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/state/BagState.java | 4 + .../apache/beam/sdk/state/CombiningState.java | 3 + .../apache/beam/sdk/state/GroupingState.java | 3 + .../org/apache/beam/sdk/state/MapState.java | 3 + .../apache/beam/sdk/state/ReadableStates.java | 7 +- .../org/apache/beam/sdk/state/SetState.java | 4 + .../java/org/apache/beam/sdk/state/State.java | 4 + .../org/apache/beam/sdk/state/StateBinder.java | 7 +- .../org/apache/beam/sdk/state/StateContext.java | 5 +- .../apache/beam/sdk/state/StateContexts.java | 5 +- .../org/apache/beam/sdk/state/StateSpec.java | 18 +++- .../org/apache/beam/sdk/state/StateSpecs.java | 88 ++++++++++++-------- .../beam/sdk/state/WatermarkHoldState.java | 11 ++- .../org/apache/beam/sdk/state/package-info.java | 2 +- 14 files changed, 109 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java index 189d151..10fba05 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java @@ -17,11 +17,15 @@ */ package org.apache.beam.sdk.state; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + /** * State containing a bag values. Items can be added to the bag and the contents read out. * * @param <T> The type of elements in the bag. */ +@Experimental(Kind.STATE) public interface BagState<T> extends GroupingState<T, Iterable<T>> { @Override BagState<T> readLater(); http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java index 6080127..ddda255 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.state; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.Combine.CombineFn; /** @@ -27,6 +29,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; * @param <AccumT> the type of accumulator * @param <OutputT> the type of value extracted from the state */ +@Experimental(Kind.STATE) public interface CombiningState<InputT, AccumT, OutputT> extends GroupingState<InputT, OutputT> { http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java index 3a12e79..d99ff25 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.state; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.transforms.Combine.CombineFn; /** @@ -26,6 +28,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; * @param <InputT> the type of values added to the state * @param <OutputT> the type of value extracted from the state */ +@Experimental(Kind.STATE) public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State { /** * Add a value to the buffer. http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java index 9f0eee9..649c3c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java @@ -18,6 +18,8 @@ package org.apache.beam.sdk.state; import java.util.Map; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; /** * An object that maps keys to values. @@ -27,6 +29,7 @@ import java.util.Map; * @param <K> the type of keys maintained by this map * @param <V> the type of mapped values */ +@Experimental(Kind.STATE) public interface MapState<K, V> extends State { /** http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java index d8df04e..6977a97 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java @@ -17,13 +17,12 @@ */ package org.apache.beam.sdk.state; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; /** - * Utilities for constructing and manipulating {@link ReadableState} instances. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> */ -@Experimental(Kind.STATE) +@Internal public class ReadableStates { /** http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java index 14aa640..cb9a0e6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java @@ -17,12 +17,16 @@ */ package org.apache.beam.sdk.state; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + /** * State containing no duplicate elements. * Items can be added to the set and the contents read out. * * @param <T> The type of elements in the set. */ +@Experimental(Kind.STATE) public interface SetState<T> extends GroupingState<T, Iterable<T>> { /** * Returns true if this set contains the specified element. http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java index 6b10c91..0c0ca32 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java @@ -17,12 +17,16 @@ */ package org.apache.beam.sdk.state; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + /** * Base interface for all state locations. * * <p>Specific types of state add appropriate accessors for reading and writing values, see * {@link ValueState}, {@link BagState}, and {@link GroupingState}. */ +@Experimental(Kind.STATE) public interface State { /** http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java index ee4aa78..af834c3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java @@ -17,14 +17,19 @@ */ package org.apache.beam.sdk.state; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; /** - * Visitor for binding a {@link StateSpec} and to the associated {@link State}. + * For internal use only; no backwards-compatibility guarantees. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Visitor for binding a {@link StateSpec} and to the associated {@link State}. */ +@Internal public interface StateBinder { <T> ValueState<T> bindValue( String id, StateSpec<ValueState<T>> spec, Coder<T> coder); http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java index 110a515..52177cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java @@ -17,13 +17,16 @@ */ package org.apache.beam.sdk.state; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; /** - * Information accessible the state API. + * For internal use only; no backwards-compatibility guarantees. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> */ +@Internal public interface StateContext<W extends BoundedWindow> { /** * Returns the {@code PipelineOptions} specified with the http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java index 63afe4f..2e21a27 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java @@ -17,13 +17,16 @@ */ package org.apache.beam.sdk.state; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.values.PCollectionView; /** - * Factory that produces {@link StateContext} based on different inputs. + * For internal use only; no backwards-compatibility guarantees. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> */ +@Internal public class StateContexts { private static final StateContext<BoundedWindow> NULL_CONTEXT = new StateContext<BoundedWindow>() { http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java index 3b0b840..b0412bf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.state; import java.io.Serializable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; /** @@ -32,22 +33,31 @@ import org.apache.beam.sdk.coders.Coder; public interface StateSpec<StateT extends State> extends Serializable { /** - * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. */ + @Internal StateT bind(String id, StateBinder binder); /** - * Given {code coders} are inferred from type arguments defined for this class. Coders which are - * already set should take precedence over offered coders. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Given {code coders} are inferred from type arguments defined for this class. Coders which + * are already set should take precedence over offered coders. * * @param coders Array of coders indexed by the type arguments order. Entries might be null if the * coder could not be inferred. */ + @Internal void offerCoders(Coder[] coders); /** - * Validates that this {@link StateSpec} has been specified correctly and finalizes it. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Validates that this {@link StateSpec} has been specified correctly and finalizes it. * Automatically invoked when the pipeline is built. */ + @Internal void finishSpecifying(); } http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java index 09cc4e7..8a3c87e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java @@ -23,6 +23,7 @@ import java.util.Objects; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -95,42 +96,6 @@ public class StateSpecs { } /** - * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple - * {@code InputT}s into a single {@code OutputT}. - * - * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should - * only be used to initialize static values. - */ - public static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> - combiningFromInputInternal( - Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - try { - Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder); - return combiningInternal(accumCoder, combineFn); - } catch (CannotProvideCoderException e) { - throw new IllegalArgumentException( - "Unable to determine accumulator coder for " - + combineFn.getClass().getSimpleName() - + " from " - + inputCoder, - e); - } - } - - private static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( - Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { - return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); - } - - private static <InputT, AccumT, OutputT> - StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( - Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { - return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); - } - - /** * Create a state spec that is optimized for adding values frequently, and occasionally retrieving * all the values that have been added. */ @@ -172,13 +137,62 @@ public class StateSpecs { return new MapStateSpec<>(keyCoder, valueCoder); } - /** Create a state spec for holding the watermark. */ + /** + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Create a state spec for values that use a {@link CombineFn} to automatically merge multiple + * {@code InputT}s into a single {@code OutputT}. + * + * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should + * only be used to initialize static values. + */ + @Internal + public static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningFromInputInternal( + Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + try { + Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder); + return combiningInternal(accumCoder, combineFn); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Unable to determine accumulator coder for " + + combineFn.getClass().getSimpleName() + + " from " + + inputCoder, + e); + } + } + + private static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( + Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); + } + + private static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( + Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); + } + + /** + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Create a state spec for a watermark hold. + */ + @Internal public static StateSpec<WatermarkHoldState> watermarkStateInternal( TimestampCombiner timestampCombiner) { return new WatermarkStateSpecInternal(timestampCombiner); } + /** + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> + * + * <p>Convert a combining state spec to a bag of accumulators. + */ + @Internal public static <InputT, AccumT, OutputT> StateSpec<BagState<AccumT>> convertToBagSpecInternal( StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) { http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java index 9f6c203..38e2cbc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java @@ -17,18 +17,17 @@ */ package org.apache.beam.sdk.state; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.joda.time.Instant; /** - * A {@link State} accepting and aggregating output timestamps, which determines the time to which - * the output watermark must be held. + * <b><i>For internal use only; no backwards-compatibility guarantees.</i></b> * - * <p><b><i>For internal use only. This API may change at any time.</i></b> + * <p>A {@link State} accepting and aggregating output timestamps, which determines the time to + * which the output watermark must be held. */ -@Experimental(Kind.STATE) +@Internal public interface WatermarkHoldState extends GroupingState<Instant, Instant> { /** * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time http://git-wip-us.apache.org/repos/asf/beam/blob/ac01ec7a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java index de5eeeb..d8b8e92 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java @@ -17,6 +17,6 @@ */ /** - * Defines internal utilities for interacting with pipeline state. + * Classes and interfaces for interacting with state. */ package org.apache.beam.sdk.state;