http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml ---------------------------------------------------------------------- diff --git a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml index 28bbc3c..1db0e86 100644 --- a/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml +++ b/sdks/java/build-tools/src/main/resources/beam/findbugs-filter.xml @@ -373,7 +373,7 @@ <!--[BEAM-420] Non-transient non-serializable instance field in serializable class--> </Match> <Match> - <Class name="org.apache.beam.sdk.util.state.StateSpecs$CombiningStateSpec"/> + <Class name="StateSpecs$CombiningStateSpec"/> <Method name="equals"/> <Bug pattern="EQ_DOESNT_OVERRIDE_EQUALS"/> <!--[BEAM-421] Class doesn't override equals in superclass-->
http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java new file mode 100644 index 0000000..189d151 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/BagState.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +/** + * State containing a bag values. Items can be added to the bag and the contents read out. + * + * @param <T> The type of elements in the bag. + */ +public interface BagState<T> extends GroupingState<T, Iterable<T>> { + @Override + BagState<T> readLater(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java new file mode 100644 index 0000000..6080127 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/CombiningState.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * State for a single value that is managed by a {@link CombineFn}. This is an internal extension + * to {@link GroupingState} that includes the {@code AccumT} type. + * + * @param <InputT> the type of values added to the state + * @param <AccumT> the type of accumulator + * @param <OutputT> the type of value extracted from the state + */ +public interface CombiningState<InputT, AccumT, OutputT> + extends GroupingState<InputT, OutputT> { + + /** + * Read the merged accumulator for this combining value. It is implied that reading the + * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for + * this. + */ + AccumT getAccum(); + + /** + * Add an accumulator to this combining value. Depending on implementation this may immediately + * merge it with the previous accumulator, or may buffer this accumulator for a future merge. + */ + void addAccum(AccumT accum); + + /** + * Merge the given accumulators according to the underlying combiner. + */ + AccumT mergeAccumulators(Iterable<AccumT> accumulators); + + @Override + CombiningState<InputT, AccumT, OutputT> readLater(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java new file mode 100644 index 0000000..3a12e79 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/GroupingState.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.transforms.Combine.CombineFn; + +/** + * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single + * {@code OutputT} value. + * + * @param <InputT> the type of values added to the state + * @param <OutputT> the type of value extracted from the state + */ +public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State { + /** + * Add a value to the buffer. + */ + void add(InputT value); + + /** + * Return true if this state is empty. + */ + ReadableState<Boolean> isEmpty(); + + @Override + GroupingState<InputT, OutputT> readLater(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java new file mode 100644 index 0000000..9f0eee9 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/MapState.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import java.util.Map; + +/** + * An object that maps keys to values. + * A map cannot contain duplicate keys; + * each key can map to at most one value. + * + * @param <K> the type of keys maintained by this map + * @param <V> the type of mapped values + */ +public interface MapState<K, V> extends State { + + /** + * Associates the specified value with the specified key in this state. + */ + void put(K key, V value); + + /** + * A deferred read-followed-by-write. + * + * <p>When {@code read()} is called on the result or state is committed, it forces a read of the + * map and reconciliation with any pending modifications. + * + * <p>If the specified key is not already associated with a value (or is mapped to {@code null}) + * associates it with the given value and returns {@code null}, else returns the current value. + */ + ReadableState<V> putIfAbsent(K key, V value); + + /** + * Removes the mapping for a key from this map if it is present. + */ + void remove(K key); + + /** + * A deferred lookup. + * + * <p>A user is encouraged to call {@code get} for all relevant keys and call {@code readLater()} + * on the results. + * + * <p>When {@code read()} is called, a particular state implementation is encouraged to perform + * all pending reads in a single batch. + */ + ReadableState<V> get(K key); + + /** + * Returns a iterable view of the keys contained in this map. + */ + ReadableState<Iterable<K>> keys(); + + /** + * Returns a iterable view of the values contained in this map. + */ + ReadableState<Iterable<V>> values(); + + /** + * Returns a iterable view of all key-values. + */ + ReadableState<Iterable<Map.Entry<K, V>>> entries(); +} + http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java new file mode 100644 index 0000000..b29ab26 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableState.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * {@link State} that can be read via {@link #read()}. + * + * <p>Use {@link #readLater()} for marking several states for prefetching. Runners + * can potentially batch these into one read. + * + * @param <T> The type of value returned by {@link #read}. + */ +@Experimental(Kind.STATE) +public interface ReadableState<T> { + /** + * Read the current value, blocking until it is available. + * + * <p>If there will be many calls to {@link #read} for different state in short succession, + * you should first call {@link #readLater} for all of them so the reads can potentially be + * batched (depending on the underlying implementation}. + */ + T read(); + + /** + * Indicate that the value will be read later. + * + * <p>This allows an implementation to start an asynchronous prefetch or + * to include this state in the next batch of reads. + * + * @return this for convenient chaining + */ + ReadableState<T> readLater(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java new file mode 100644 index 0000000..d8df04e --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ReadableStates.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * Utilities for constructing and manipulating {@link ReadableState} instances. + */ +@Experimental(Kind.STATE) +public class ReadableStates { + + /** + * A {@link ReadableState} constructed from a constant value, hence immediately available. + */ + public static <T> ReadableState<T> immediate(final T value) { + return new ReadableState<T>() { + @Override + public T read() { + return value; + } + + @Override + public ReadableState<T> readLater() { + return this; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java new file mode 100644 index 0000000..14aa640 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/SetState.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +/** + * State containing no duplicate elements. + * Items can be added to the set and the contents read out. + * + * @param <T> The type of elements in the set. + */ +public interface SetState<T> extends GroupingState<T, Iterable<T>> { + /** + * Returns true if this set contains the specified element. + */ + ReadableState<Boolean> contains(T t); + + /** + * Ensures a value is a member of the set, returning {@code true} if it was added and {@code + * false} otherwise. + */ + ReadableState<Boolean> addIfAbsent(T t); + + /** + * Removes the specified element from this set if it is present. + */ + void remove(T t); + + @Override + SetState<T> readLater(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java new file mode 100644 index 0000000..6b10c91 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/State.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +/** + * Base interface for all state locations. + * + * <p>Specific types of state add appropriate accessors for reading and writing values, see + * {@link ValueState}, {@link BagState}, and {@link GroupingState}. + */ +public interface State { + + /** + * Clear out the state location. + */ + void clear(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java new file mode 100644 index 0000000..ee4aa78 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateBinder.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.CombineWithContext; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; + +/** + * Visitor for binding a {@link StateSpec} and to the associated {@link State}. + */ +public interface StateBinder { + <T> ValueState<T> bindValue( + String id, StateSpec<ValueState<T>> spec, Coder<T> coder); + + <T> BagState<T> bindBag( + String id, StateSpec<BagState<T>> spec, Coder<T> elemCoder); + + <T> SetState<T> bindSet( + String id, StateSpec<SetState<T>> spec, Coder<T> elemCoder); + + <KeyT, ValueT> MapState<KeyT, ValueT> bindMap( + String id, + StateSpec<MapState<KeyT, ValueT>> spec, + Coder<KeyT> mapKeyCoder, + Coder<ValueT> mapValueCoder); + + <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombining( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + Combine.CombineFn<InputT, AccumT, OutputT> combineFn); + + <InputT, AccumT, OutputT> CombiningState<InputT, AccumT, OutputT> bindCombiningWithContext( + String id, + StateSpec<CombiningState<InputT, AccumT, OutputT>> spec, + Coder<AccumT> accumCoder, + CombineWithContext.CombineFnWithContext<InputT, AccumT, OutputT> combineFn); + + /** + * Bind to a watermark {@link StateSpec}. + * + * <p>This accepts the {@link TimestampCombiner} that dictates how watermark hold timestamps added + * to the returned {@link WatermarkHoldState} are to be combined. + */ + WatermarkHoldState bindWatermark( + String id, + StateSpec<WatermarkHoldState> spec, + TimestampCombiner timestampCombiner); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java new file mode 100644 index 0000000..110a515 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContext.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Information accessible the state API. + */ +public interface StateContext<W extends BoundedWindow> { + /** + * Returns the {@code PipelineOptions} specified with the + * {@link org.apache.beam.sdk.runners.PipelineRunner}. + */ + PipelineOptions getPipelineOptions(); + + /** + * Returns the value of the side input for the corresponding state window. + */ + <T> T sideInput(PCollectionView<T> view); + + /** + * Returns the window corresponding to the state. + */ + W window(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java new file mode 100644 index 0000000..63afe4f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateContexts.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Factory that produces {@link StateContext} based on different inputs. + */ +public class StateContexts { + private static final StateContext<BoundedWindow> NULL_CONTEXT = + new StateContext<BoundedWindow>() { + @Override + public PipelineOptions getPipelineOptions() { + throw new IllegalArgumentException("cannot call getPipelineOptions() in a null context"); + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + throw new IllegalArgumentException("cannot call sideInput() in a null context"); + } + + @Override + public BoundedWindow window() { + throw new IllegalArgumentException("cannot call window() in a null context"); + } + }; + + /** Returns a fake {@link StateContext}. */ + @SuppressWarnings("unchecked") + public static <W extends BoundedWindow> StateContext<W> nullContext() { + return (StateContext<W>) NULL_CONTEXT; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java new file mode 100644 index 0000000..3b0b840 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpec.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import java.io.Serializable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.Coder; + +/** + * A specification of a persistent state cell. This includes information necessary to encode the + * value and details about the intended access pattern. + * + * @param <StateT> The type of state being described. + */ +@Experimental(Kind.STATE) +public interface StateSpec<StateT extends State> extends Serializable { + + /** + * Use the {@code binder} to create an instance of {@code StateT} appropriate for this address. + */ + StateT bind(String id, StateBinder binder); + + /** + * Given {code coders} are inferred from type arguments defined for this class. Coders which are + * already set should take precedence over offered coders. + * + * @param coders Array of coders indexed by the type arguments order. Entries might be null if the + * coder could not be inferred. + */ + void offerCoders(Coder[] coders); + + /** + * Validates that this {@link StateSpec} has been specified correctly and finalizes it. + * Automatically invoked when the pipeline is built. + */ + void finishSpecifying(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java new file mode 100644 index 0000000..09cc4e7 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/StateSpecs.java @@ -0,0 +1,629 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Objects; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; + +/** + * Static utility methods for creating {@link StateSpec} instances. + */ +@Experimental(Kind.STATE) +public class StateSpecs { + + private static final CoderRegistry STANDARD_REGISTRY = CoderRegistry.createDefault(); + + private StateSpecs() {} + + /** Create a simple state spec for values of type {@code T}. */ + public static <T> StateSpec<ValueState<T>> value() { + return new ValueStateSpec<>(null); + } + + /** Create a simple state spec for values of type {@code T}. */ + public static <T> StateSpec<ValueState<T>> value(Coder<T> valueCoder) { + checkArgument(valueCoder != null, "valueCoder should not be null. Consider value() instead"); + return new ValueStateSpec<>(valueCoder); + } + + /** + * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple + * {@code InputT}s into a single {@code OutputT}. + */ + public static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( + CombineFn<InputT, AccumT, OutputT> combineFn) { + return new CombiningStateSpec<InputT, AccumT, OutputT>(null, combineFn); + } + + /** + * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge + * multiple {@code InputT}s into a single {@code OutputT}. + */ + public static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(null, combineFn); + } + + /** + * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple + * {@code InputT}s into a single {@code OutputT}. + */ + public static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( + Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + checkArgument(accumCoder != null, + "accumCoder should not be null. " + + "Consider using combining(CombineFn<> combineFn) instead."); + return combiningInternal(accumCoder, combineFn); + } + + /** + * Create a state spec for values that use a {@link CombineFnWithContext} to automatically merge + * multiple {@code InputT}s into a single {@code OutputT}. + */ + public static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combining( + Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return combiningInternal(accumCoder, combineFn); + } + + /** + * Create a state spec for values that use a {@link CombineFn} to automatically merge multiple + * {@code InputT}s into a single {@code OutputT}. + * + * <p>This determines the {@code Coder<AccumT>} from the given {@code Coder<InputT>}, and should + * only be used to initialize static values. + */ + public static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> + combiningFromInputInternal( + Coder<InputT> inputCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + try { + Coder<AccumT> accumCoder = combineFn.getAccumulatorCoder(STANDARD_REGISTRY, inputCoder); + return combiningInternal(accumCoder, combineFn); + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Unable to determine accumulator coder for " + + combineFn.getClass().getSimpleName() + + " from " + + inputCoder, + e); + } + } + + private static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( + Coder<AccumT> accumCoder, CombineFn<InputT, AccumT, OutputT> combineFn) { + return new CombiningStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); + } + + private static <InputT, AccumT, OutputT> + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningInternal( + Coder<AccumT> accumCoder, CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + return new CombiningWithContextStateSpec<InputT, AccumT, OutputT>(accumCoder, combineFn); + } + + /** + * Create a state spec that is optimized for adding values frequently, and occasionally retrieving + * all the values that have been added. + */ + public static <T> StateSpec<BagState<T>> bag() { + return bag(null); + } + + /** + * Create a state spec that is optimized for adding values frequently, and occasionally retrieving + * all the values that have been added. + */ + public static <T> StateSpec<BagState<T>> bag(Coder<T> elemCoder) { + return new BagStateSpec<>(elemCoder); + } + + /** + * Create a state spec that supporting for {@link java.util.Set} like access patterns. + */ + public static <T> StateSpec<SetState<T>> set() { + return set(null); + } + + /** + * Create a state spec that supporting for {@link java.util.Set} like access patterns. + */ + public static <T> StateSpec<SetState<T>> set(Coder<T> elemCoder) { + return new SetStateSpec<>(elemCoder); + } + + /** + * Create a state spec that supporting for {@link java.util.Map} like access patterns. + */ + public static <K, V> StateSpec<MapState<K, V>> map() { + return new MapStateSpec<>(null, null); + } + + /** Create a state spec that supporting for {@link java.util.Map} like access patterns. */ + public static <K, V> StateSpec<MapState<K, V>> map(Coder<K> keyCoder, Coder<V> valueCoder) { + return new MapStateSpec<>(keyCoder, valueCoder); + } + + /** Create a state spec for holding the watermark. */ + public static + StateSpec<WatermarkHoldState> watermarkStateInternal( + TimestampCombiner timestampCombiner) { + return new WatermarkStateSpecInternal(timestampCombiner); + } + + public static <InputT, AccumT, OutputT> + StateSpec<BagState<AccumT>> convertToBagSpecInternal( + StateSpec<CombiningState<InputT, AccumT, OutputT>> combiningSpec) { + if (combiningSpec instanceof CombiningStateSpec) { + // Checked above; conversion to a bag spec depends on the provided spec being one of those + // created via the factory methods in this class. + @SuppressWarnings("unchecked") + CombiningStateSpec<InputT, AccumT, OutputT> typedSpec = + (CombiningStateSpec<InputT, AccumT, OutputT>) combiningSpec; + return typedSpec.asBagSpec(); + } else if (combiningSpec instanceof CombiningWithContextStateSpec) { + @SuppressWarnings("unchecked") + CombiningWithContextStateSpec<InputT, AccumT, OutputT> typedSpec = + (CombiningWithContextStateSpec<InputT, AccumT, OutputT>) combiningSpec; + return typedSpec.asBagSpec(); + } else { + throw new IllegalArgumentException("Unexpected StateSpec " + combiningSpec); + } + } + + /** + * A specification for a state cell holding a settable value of type {@code T}. + * + * <p>Includes the coder for {@code T}. + */ + private static class ValueStateSpec<T> implements StateSpec<ValueState<T>> { + + @Nullable + private Coder<T> coder; + + private ValueStateSpec(@Nullable Coder<T> coder) { + this.coder = coder; + } + + @Override + public ValueState<T> bind(String id, StateBinder visitor) { + return visitor.bindValue(id, this, coder); + } + + @SuppressWarnings("unchecked") + @Override + public void offerCoders(Coder[] coders) { + if (this.coder == null) { + if (coders[0] != null) { + this.coder = (Coder<T>) coders[0]; + } + } + } + + @Override public void finishSpecifying() { + if (coder == null) { + throw new IllegalStateException("Unable to infer a coder for ValueState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.value(Coder<T> valueCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof ValueStateSpec)) { + return false; + } + + ValueStateSpec<?> that = (ValueStateSpec<?>) obj; + return Objects.equals(this.coder, that.coder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), coder); + } + } + + /** + * A specification for a state cell that is combined according to a {@link CombineFn}. + * + * <p>Includes the {@link CombineFn} and the coder for the accumulator type. + */ + private static class CombiningStateSpec<InputT, AccumT, OutputT> + implements StateSpec<CombiningState<InputT, AccumT, OutputT>> { + + @Nullable + private Coder<AccumT> accumCoder; + private final CombineFn<InputT, AccumT, OutputT> combineFn; + + private CombiningStateSpec( + @Nullable Coder<AccumT> accumCoder, + CombineFn<InputT, AccumT, OutputT> combineFn) { + this.combineFn = combineFn; + this.accumCoder = accumCoder; + } + + @Override + public CombiningState<InputT, AccumT, OutputT> bind( + String id, StateBinder visitor) { + return visitor.bindCombining(id, this, accumCoder, combineFn); + } + + @SuppressWarnings("unchecked") + @Override + public void offerCoders(Coder[] coders) { + if (this.accumCoder == null) { + if (coders[1] != null) { + this.accumCoder = (Coder<AccumT>) coders[1]; + } + } + } + + @Override public void finishSpecifying() { + if (accumCoder == null) { + throw new IllegalStateException("Unable to infer a coder for" + + " CombiningState and no Coder was specified." + + " Please set a coder by either invoking" + + " StateSpecs.combining(Coder<AccumT> accumCoder," + + " CombineFn<InputT, AccumT, OutputT> combineFn)" + + " or by registering the coder in the Pipeline's CoderRegistry."); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof CombiningStateSpec)) { + return false; + } + + CombiningStateSpec<?, ?, ?> that = + (CombiningStateSpec<?, ?, ?>) obj; + return Objects.equals(this.accumCoder, that.accumCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), accumCoder); + } + + private StateSpec<BagState<AccumT>> asBagSpec() { + return new BagStateSpec<AccumT>(accumCoder); + } + } + + /** + * A specification for a state cell that is combined according to a {@link + * CombineFnWithContext}. + * + * <p>Includes the {@link CombineFnWithContext} and the coder for the accumulator type. + */ + private static class CombiningWithContextStateSpec<InputT, AccumT, OutputT> + implements StateSpec<CombiningState<InputT, AccumT, OutputT>> { + + @Nullable private Coder<AccumT> accumCoder; + private final CombineFnWithContext<InputT, AccumT, OutputT> combineFn; + + private CombiningWithContextStateSpec( + @Nullable Coder<AccumT> accumCoder, + CombineFnWithContext<InputT, AccumT, OutputT> combineFn) { + this.combineFn = combineFn; + this.accumCoder = accumCoder; + } + + @Override + public CombiningState<InputT, AccumT, OutputT> bind( + String id, StateBinder visitor) { + return visitor.bindCombiningWithContext(id, this, accumCoder, combineFn); + } + + @SuppressWarnings("unchecked") + @Override + public void offerCoders(Coder[] coders) { + if (this.accumCoder == null) { + if (coders[2] != null) { + this.accumCoder = (Coder<AccumT>) coders[2]; + } + } + } + + @Override + public void finishSpecifying() { + if (accumCoder == null) { + throw new IllegalStateException( + "Unable to infer a coder for" + + " CombiningWithContextState and no Coder was specified." + + " Please set a coder by either invoking" + + " StateSpecs.combiningWithcontext(Coder<AccumT> accumCoder," + + " CombineFnWithContext<InputT, AccumT, OutputT> combineFn)" + + " or by registering the coder in the Pipeline's CoderRegistry."); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof CombiningWithContextStateSpec)) { + return false; + } + + CombiningWithContextStateSpec<?, ?, ?> that = (CombiningWithContextStateSpec<?, ?, ?>) obj; + return Objects.equals(this.accumCoder, that.accumCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), accumCoder); + } + + private StateSpec<BagState<AccumT>> asBagSpec() { + return new BagStateSpec<AccumT>(accumCoder); + } + } + + /** + * A specification for a state cell supporting for bag-like access patterns + * (frequent additions, occasional reads of all the values). + * + * <p>Includes the coder for the element type {@code T}</p> + */ + private static class BagStateSpec<T> implements StateSpec<BagState<T>> { + + @Nullable + private Coder<T> elemCoder; + + private BagStateSpec(@Nullable Coder<T> elemCoder) { + this.elemCoder = elemCoder; + } + + @Override + public BagState<T> bind(String id, StateBinder visitor) { + return visitor.bindBag(id, this, elemCoder); + } + + @SuppressWarnings("unchecked") + @Override + public void offerCoders(Coder[] coders) { + if (this.elemCoder == null) { + if (coders[0] != null) { + this.elemCoder = (Coder<T>) coders[0]; + } + } + } + + @Override public void finishSpecifying() { + if (elemCoder == null) { + throw new IllegalStateException("Unable to infer a coder for BagState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.bag(Coder<T> elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof BagStateSpec)) { + return false; + } + + BagStateSpec<?> that = (BagStateSpec<?>) obj; + return Objects.equals(this.elemCoder, that.elemCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), elemCoder); + } + } + + private static class MapStateSpec<K, V> implements StateSpec<MapState<K, V>> { + + @Nullable + private Coder<K> keyCoder; + @Nullable + private Coder<V> valueCoder; + + private MapStateSpec(@Nullable Coder<K> keyCoder, @Nullable Coder<V> valueCoder) { + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + } + + @Override + public MapState<K, V> bind(String id, StateBinder visitor) { + return visitor.bindMap(id, this, keyCoder, valueCoder); + } + + @SuppressWarnings("unchecked") + @Override + public void offerCoders(Coder[] coders) { + if (this.keyCoder == null) { + if (coders[0] != null) { + this.keyCoder = (Coder<K>) coders[0]; + } + } + if (this.valueCoder == null) { + if (coders[1] != null) { + this.valueCoder = (Coder<V>) coders[1]; + } + } + } + + @Override public void finishSpecifying() { + if (keyCoder == null || valueCoder == null) { + throw new IllegalStateException("Unable to infer a coder for MapState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.map(Coder<K> keyCoder, Coder<V> valueCoder) or by registering the" + + " coder in the Pipeline's CoderRegistry."); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof MapStateSpec)) { + return false; + } + + MapStateSpec<?, ?> that = (MapStateSpec<?, ?>) obj; + return Objects.equals(this.keyCoder, that.keyCoder) + && Objects.equals(this.valueCoder, that.valueCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), keyCoder, valueCoder); + } + } + + /** + * A specification for a state cell supporting for set-like access patterns. + * + * <p>Includes the coder for the element type {@code T}</p> + */ + private static class SetStateSpec<T> implements StateSpec<SetState<T>> { + + @Nullable + private Coder<T> elemCoder; + + private SetStateSpec(@Nullable Coder<T> elemCoder) { + this.elemCoder = elemCoder; + } + + @Override + public SetState<T> bind(String id, StateBinder visitor) { + return visitor.bindSet(id, this, elemCoder); + } + + @SuppressWarnings("unchecked") + @Override + public void offerCoders(Coder[] coders) { + if (this.elemCoder == null) { + if (coders[0] != null) { + this.elemCoder = (Coder<T>) coders[0]; + } + } + } + + @Override public void finishSpecifying() { + if (elemCoder == null) { + throw new IllegalStateException("Unable to infer a coder for SetState and no Coder" + + " was specified. Please set a coder by either invoking" + + " StateSpecs.set(Coder<T> elemCoder) or by registering the coder in the" + + " Pipeline's CoderRegistry."); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + if (!(obj instanceof SetStateSpec)) { + return false; + } + + SetStateSpec<?> that = (SetStateSpec<?>) obj; + return Objects.equals(this.elemCoder, that.elemCoder); + } + + @Override + public int hashCode() { + return Objects.hash(getClass(), elemCoder); + } + } + + /** + * A specification for a state cell tracking a combined watermark hold. + * + * <p>Includes the {@link TimestampCombiner} according to which the output times + * are combined. + */ + private static class WatermarkStateSpecInternal implements StateSpec<WatermarkHoldState> { + + /** + * When multiple output times are added to hold the watermark, this determines how they are + * combined, and also the behavior when merging windows. Does not contribute to equality/hash + * since we have at most one watermark hold spec per computation. + */ + private final TimestampCombiner timestampCombiner; + + private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) { + this.timestampCombiner = timestampCombiner; + } + + @Override + public WatermarkHoldState bind(String id, StateBinder visitor) { + return visitor.bindWatermark(id, this, timestampCombiner); + } + + @Override + public void offerCoders(Coder[] coders) { + } + + @Override public void finishSpecifying() { + // Currently an empty implementation as there are no coders to validate. + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + + // All instance of WatermarkHoldState are considered equal + return obj instanceof WatermarkStateSpecInternal; + } + + @Override + public int hashCode() { + return Objects.hash(getClass()); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java new file mode 100644 index 0000000..ca97db2 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/ValueState.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; + +/** + * State holding a single value. + * + * @param <T> The type of values being stored. + */ +@Experimental(Kind.STATE) +public interface ValueState<T> extends ReadableState<T>, State { + /** + * Set the value of the buffer. + */ + void write(T input); + + @Override + ValueState<T> readLater(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java new file mode 100644 index 0000000..9f6c203 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.state; + +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; +import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; +import org.joda.time.Instant; + +/** + * A {@link State} accepting and aggregating output timestamps, which determines the time to which + * the output watermark must be held. + * + * <p><b><i>For internal use only. This API may change at any time.</i></b> + */ +@Experimental(Kind.STATE) +public interface WatermarkHoldState extends GroupingState<Instant, Instant> { + /** + * Return the {@link TimestampCombiner} which will be used to determine a watermark hold time + * given an element timestamp, and to combine watermarks from windows which are about to be + * merged. + */ + TimestampCombiner getTimestampCombiner(); + + @Override + WatermarkHoldState readLater(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java new file mode 100644 index 0000000..de5eeeb --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Defines internal utilities for interacting with pipeline state. + */ +package org.apache.beam.sdk.state; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java index 9bced41..585d8b7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesMapState.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.testing; -import org.apache.beam.sdk.util.state.MapState; +import org.apache.beam.sdk.state.MapState; /** * Category tag for validation tests which utilize {@link MapState}. http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java index 6fd74bd..7d82d22 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSetState.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.testing; -import org.apache.beam.sdk.util.state.SetState; +import org.apache.beam.sdk.state.SetState; /** * Category tag for validation tests which utilize {@link SetState}. http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index f3d178e..c858936 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -27,6 +27,8 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.splittabledofn.HasDefaultTracker; @@ -37,8 +39,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index ca7427c..ead2569 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -320,7 +320,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override - public org.apache.beam.sdk.util.state.State state(String stateId) { + public org.apache.beam.sdk.state.State state(String stateId) { throw new UnsupportedOperationException("DoFnTester doesn't support state yet"); } http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java index e132115..6828979 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupIntoBatches.java @@ -23,16 +23,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.CombiningState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.TimerSpecs; -import org.apache.beam.sdk.util.state.BagState; -import org.apache.beam.sdk.util.state.CombiningState; -import org.apache.beam.sdk.util.state.StateSpec; -import org.apache.beam.sdk.util.state.StateSpecs; -import org.apache.beam.sdk.util.state.ValueState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java index 7dd2cdd..c45311a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.transforms.DoFn.WindowedContext; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayData.Builder; @@ -46,7 +47,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.NameUtils; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java index 3c44afe..d5a1a94 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.reflect; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.state.State; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.FinishBundle; import org.apache.beam.sdk.transforms.DoFn.ProcessElement; @@ -28,7 +29,6 @@ import org.apache.beam.sdk.transforms.DoFn.TimerId; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.state.State; /** * Interface for invoking the {@code DoFn} processing methods. http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java index 3219f96..72ad4b0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java @@ -27,6 +27,8 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; @@ -38,8 +40,6 @@ import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index bac3bef..3dfca8c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -42,6 +42,8 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.state.State; +import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.StateId; import org.apache.beam.sdk.transforms.DoFn.TimerId; @@ -58,8 +60,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerSpec; import org.apache.beam.sdk.util.common.ReflectHelpers; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.sdk.values.TypeParameter; http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java index 31d1f64..f93cb0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java @@ -18,9 +18,9 @@ package org.apache.beam.sdk.util; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.StateContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.state.StateContext; import org.apache.beam.sdk.values.PCollectionView; /** http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java index a394180..1b1c352 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineFnUtil.java @@ -23,12 +23,12 @@ import java.io.ObjectOutputStream; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.state.StateContext; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.util.state.StateContext; /** * Static utility methods that create combine function instances. http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java deleted file mode 100644 index e0eebe5..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/BagState.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -/** - * State containing a bag values. Items can be added to the bag and the contents read out. - * - * @param <T> The type of elements in the bag. - */ -public interface BagState<T> extends GroupingState<T, Iterable<T>> { - @Override - BagState<T> readLater(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java deleted file mode 100644 index 80e4dc9..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/CombiningState.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * State for a single value that is managed by a {@link CombineFn}. This is an internal extension - * to {@link GroupingState} that includes the {@code AccumT} type. - * - * @param <InputT> the type of values added to the state - * @param <AccumT> the type of accumulator - * @param <OutputT> the type of value extracted from the state - */ -public interface CombiningState<InputT, AccumT, OutputT> - extends GroupingState<InputT, OutputT> { - - /** - * Read the merged accumulator for this combining value. It is implied that reading the - * state involes reading the accumulator, so {@link #readLater} is sufficient to prefetch for - * this. - */ - AccumT getAccum(); - - /** - * Add an accumulator to this combining value. Depending on implementation this may immediately - * merge it with the previous accumulator, or may buffer this accumulator for a future merge. - */ - void addAccum(AccumT accum); - - /** - * Merge the given accumulators according to the underlying combiner. - */ - AccumT mergeAccumulators(Iterable<AccumT> accumulators); - - @Override - CombiningState<InputT, AccumT, OutputT> readLater(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java deleted file mode 100644 index bd7a8d9..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/GroupingState.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.transforms.Combine.CombineFn; - -/** - * State that combines multiple {@code InputT} values using a {@link CombineFn} to produce a single - * {@code OutputT} value. - * - * @param <InputT> the type of values added to the state - * @param <OutputT> the type of value extracted from the state - */ -public interface GroupingState<InputT, OutputT> extends ReadableState<OutputT>, State { - /** - * Add a value to the buffer. - */ - void add(InputT value); - - /** - * Return true if this state is empty. - */ - ReadableState<Boolean> isEmpty(); - - @Override - GroupingState<InputT, OutputT> readLater(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java deleted file mode 100644 index fb7e807..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/MapState.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import java.util.Map; - -/** - * An object that maps keys to values. - * A map cannot contain duplicate keys; - * each key can map to at most one value. - * - * @param <K> the type of keys maintained by this map - * @param <V> the type of mapped values - */ -public interface MapState<K, V> extends State { - - /** - * Associates the specified value with the specified key in this state. - */ - void put(K key, V value); - - /** - * A deferred read-followed-by-write. - * - * <p>When {@code read()} is called on the result or state is committed, it forces a read of the - * map and reconciliation with any pending modifications. - * - * <p>If the specified key is not already associated with a value (or is mapped to {@code null}) - * associates it with the given value and returns {@code null}, else returns the current value. - */ - ReadableState<V> putIfAbsent(K key, V value); - - /** - * Removes the mapping for a key from this map if it is present. - */ - void remove(K key); - - /** - * A deferred lookup. - * - * <p>A user is encouraged to call {@code get} for all relevant keys and call {@code readLater()} - * on the results. - * - * <p>When {@code read()} is called, a particular state implementation is encouraged to perform - * all pending reads in a single batch. - */ - ReadableState<V> get(K key); - - /** - * Returns a iterable view of the keys contained in this map. - */ - ReadableState<Iterable<K>> keys(); - - /** - * Returns a iterable view of the values contained in this map. - */ - ReadableState<Iterable<V>> values(); - - /** - * Returns a iterable view of all key-values. - */ - ReadableState<Iterable<Map.Entry<K, V>>> entries(); -} - http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java deleted file mode 100644 index c3e9936..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableState.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * {@link State} that can be read via {@link #read()}. - * - * <p>Use {@link #readLater()} for marking several states for prefetching. Runners - * can potentially batch these into one read. - * - * @param <T> The type of value returned by {@link #read}. - */ -@Experimental(Kind.STATE) -public interface ReadableState<T> { - /** - * Read the current value, blocking until it is available. - * - * <p>If there will be many calls to {@link #read} for different state in short succession, - * you should first call {@link #readLater} for all of them so the reads can potentially be - * batched (depending on the underlying implementation}. - */ - T read(); - - /** - * Indicate that the value will be read later. - * - * <p>This allows an implementation to start an asynchronous prefetch or - * to include this state in the next batch of reads. - * - * @return this for convenient chaining - */ - ReadableState<T> readLater(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java deleted file mode 100644 index 819eda6..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/ReadableStates.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; - -/** - * Utilities for constructing and manipulating {@link ReadableState} instances. - */ -@Experimental(Kind.STATE) -public class ReadableStates { - - /** - * A {@link ReadableState} constructed from a constant value, hence immediately available. - */ - public static <T> ReadableState<T> immediate(final T value) { - return new ReadableState<T>() { - @Override - public T read() { - return value; - } - - @Override - public ReadableState<T> readLater() { - return this; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java deleted file mode 100644 index 56ea510..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/SetState.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -/** - * State containing no duplicate elements. - * Items can be added to the set and the contents read out. - * - * @param <T> The type of elements in the set. - */ -public interface SetState<T> extends GroupingState<T, Iterable<T>> { - /** - * Returns true if this set contains the specified element. - */ - ReadableState<Boolean> contains(T t); - - /** - * Ensures a value is a member of the set, returning {@code true} if it was added and {@code - * false} otherwise. - */ - ReadableState<Boolean> addIfAbsent(T t); - - /** - * Removes the specified element from this set if it is present. - */ - void remove(T t); - - @Override - SetState<T> readLater(); -} http://git-wip-us.apache.org/repos/asf/beam/blob/78e0acca/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java deleted file mode 100644 index 3a49f01..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/State.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.state; - -/** - * Base interface for all state locations. - * - * <p>Specific types of state add appropriate accessors for reading and writing values, see - * {@link ValueState}, {@link BagState}, and {@link GroupingState}. - */ -public interface State { - - /** - * Clear out the state location. - */ - void clear(); -}