This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2998ef1aeb2c0b01694dacbe72bae0e6c5ca3631 Author: Jeyhun Karimov <je.kari...@gmail.com> AuthorDate: Thu May 30 16:47:37 2024 +0200 [FLINK-34977][API] Support State Declarations --- .../common/state/AggregatingStateDeclaration.java | 33 ++ .../common/state/BroadcastStateDeclaration.java | 32 ++ .../api/common/state/ListStateDeclaration.java | 54 +++ .../api/common/state/MapStateDeclaration.java | 32 ++ .../api/common/state/ReducingStateDeclaration.java | 33 ++ .../flink/api/common/state/StateDeclaration.java | 67 ++++ .../flink/api/common/state/StateDeclarations.java | 379 +++++++++++++++++++++ .../api/common/state/ValueStateDeclaration.java | 29 ++ .../state/AggregatingStateDeclarationTest.java | 85 +++++ .../api/common/state/ListStateDeclarationTest.java | 63 ++++ .../api/common/state/MapStateDeclarationTest.java | 71 ++++ .../common/state/ReducingStateDeclarationTest.java | 64 ++++ .../common/state/ValueStateDeclarationTest.java | 58 ++++ 13 files changed, 1000 insertions(+) diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/AggregatingStateDeclaration.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/AggregatingStateDeclaration.java new file mode 100644 index 00000000000..a9660c8840d --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/AggregatingStateDeclaration.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; + +/** This represents a declaration of the aggregating state. */ +@Experimental +public interface AggregatingStateDeclaration<IN, ACC, OUT> extends StateDeclaration { + /** Get type descriptor of this state. */ + TypeDescriptor<ACC> getTypeDescriptor(); + + /** Get the aggregate function of this state. */ + AggregateFunction<IN, ACC, OUT> getAggregateFunction(); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/BroadcastStateDeclaration.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/BroadcastStateDeclaration.java new file mode 100644 index 00000000000..1f432048a15 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/BroadcastStateDeclaration.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; + +/** This represents a declaration of the broadcast state. */ +@Experimental +public interface BroadcastStateDeclaration<K, V> extends StateDeclaration { + /** Get type descriptor of this broadcast state's key. */ + TypeDescriptor<K> getKeyTypeDescriptor(); + + /** Get type descriptor of this broadcast state's value. */ + TypeDescriptor<V> getValueTypeDescriptor(); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java new file mode 100644 index 00000000000..5593a7f57e9 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; + +/** This represents a declaration of the list state. */ +@Experimental +public interface ListStateDeclaration<T> extends StateDeclaration { + /** + * Get the {@link RedistributionStrategy} of this list state. + * + * @return the redistribution strategy of this list state. + */ + RedistributionStrategy getRedistributionStrategy(); + + /** + * {@link RedistributionStrategy} is used to guide the assignment of states during rescaling. + */ + @Experimental + enum RedistributionStrategy { + /** + * The whole state is logically a concatenation of all lists. On restore/redistribution, the + * list is evenly divided into as many sub-lists as there are parallel operators. Each + * operator gets a sub-list, which can be empty, or contain one or more elements. + */ + SPLIT, + /** + * The whole state is logically a concatenation of all lists. On restore/redistribution, + * each operator gets the complete list of state elements. + */ + UNION + } + + /** Get type descriptor of this list state's element. */ + TypeDescriptor<T> getTypeDescriptor(); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapStateDeclaration.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapStateDeclaration.java new file mode 100644 index 00000000000..1e844153a0a --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapStateDeclaration.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; + +/** This represents a declaration of the map state. */ +@Experimental +public interface MapStateDeclaration<K, V> extends StateDeclaration { + /** Get type descriptor of this map state's key. */ + TypeDescriptor<K> getKeyTypeDescriptor(); + + /** Get type descriptor of this map state's value. */ + TypeDescriptor<V> getValueTypeDescriptor(); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/ReducingStateDeclaration.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ReducingStateDeclaration.java new file mode 100644 index 00000000000..7bbcb3ddac5 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ReducingStateDeclaration.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; + +/** This represents a declaration of the reducing state. */ +@Experimental +public interface ReducingStateDeclaration<T> extends StateDeclaration { + /** Get type descriptor of this reducing state. */ + TypeDescriptor<T> getTypeDescriptor(); + + /** Get {@link ReduceFunction} associated with this reducing state. */ + ReduceFunction<T> getReduceFunction(); +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java new file mode 100644 index 00000000000..b251032cd46 --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Experimental; + +import java.io.Serializable; + +/** {@link StateDeclaration} represents a declaration of the specific state used. */ +@Experimental +public interface StateDeclaration extends Serializable { + + /** Get the name of this state. */ + String getName(); + + /** + * Get the {@link RedistributionMode} of this state. More details see the doc of {@link + * RedistributionMode}. + */ + RedistributionMode getRedistributionMode(); + + /** + * {@link RedistributionMode} is used to indicate whether this state supports redistribution + * between partitions and how to redistribute this state during rescaling. + */ + @Experimental + enum RedistributionMode { + /** + * Not supports redistribution. + * + * <p>For example : KeyedState is bind to a specific keyGroup, so it is can't support + * redistribution between partitions. + */ + NONE, + + /** + * This state can be safely redistributed between different partitions, and the specific + * redistribution strategy is determined by the state itself. + * + * <p>For example: ListState's redistribution algorithm is determined by {@link + * ListStateDeclaration.RedistributionStrategy}. + */ + REDISTRIBUTABLE, + + /** + * States are guranteed to be identical in different partitions, thus redistribution is not + * a problem. + */ + IDENTICAL + } +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclarations.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclarations.java new file mode 100644 index 00000000000..bb1ab6e87fc --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclarations.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; + +import java.io.Serializable; + +/** This is a helper class for declaring various states. */ +@Experimental +public class StateDeclarations { + + /** Get the builder of {@link AggregatingStateDeclaration}. */ + public static <IN, OUT, ACC> + AggregatingStateDeclarationBuilder<IN, OUT, ACC> aggregatingStateBuilder( + String name, + TypeDescriptor<ACC> aggTypeDescriptor, + AggregateFunction<IN, ACC, OUT> aggregateFunction) { + return new AggregatingStateDeclarationBuilder<>(name, aggTypeDescriptor, aggregateFunction); + } + + /** Get the builder of {@link ReducingStateDeclaration}. */ + public static <T> ReducingStateDeclarationBuilder<T> reducingStateBuilder( + String name, TypeDescriptor<T> typeInformation, ReduceFunction<T> reduceFunction) { + return new ReducingStateDeclarationBuilder<>(name, typeInformation, reduceFunction); + } + + /** Get the builder of {@link MapStateDeclaration}. */ + public static <K, V> MapStateDeclarationBuilder<K, V> mapStateBuilder( + String name, + TypeDescriptor<K> keyTypeInformation, + TypeDescriptor<V> valueTypeInformation) { + return new MapStateDeclarationBuilder<>(name, keyTypeInformation, valueTypeInformation); + } + + /** Get the builder of {@link ListStateDeclaration}. */ + public static <T> ListStateDeclarationBuilder<T> listStateBuilder( + String name, TypeDescriptor<T> elementTypeInformation) { + return new ListStateDeclarationBuilder<>(name, elementTypeInformation); + } + + /** Get the builder of {@link ValueStateDeclaration}. */ + public <T> ValueStateDeclarationBuilder<T> valueStateBuilder( + String name, TypeDescriptor<T> valueType) { + return new ValueStateDeclarationBuilder<>(name, valueType); + } + + /** + * Get the {@link AggregatingStateDeclaration} of aggregating state. If you want to configure it + * more elaborately, use {@link #aggregatingStateBuilder(String, TypeDescriptor, + * AggregateFunction)}. + */ + public static <IN, ACC, OUT> AggregatingStateDeclaration<IN, ACC, OUT> aggregatingState( + String name, + TypeDescriptor<ACC> aggTypeDescriptor, + AggregateFunction<IN, ACC, OUT> aggregateFunction) { + return new AggregatingStateDeclarationBuilder<>(name, aggTypeDescriptor, aggregateFunction) + .build(); + } + + /** + * Get the {@link ReducingStateDeclaration} of list state. If you want to configure it more + * elaborately, use {@link StateDeclarations#reducingStateBuilder(String, TypeDescriptor, + * ReduceFunction)}. + */ + public static <T> ReducingStateDeclaration<T> reducingState( + String name, TypeDescriptor<T> typeInformation, ReduceFunction<T> reduceFunction) { + return new ReducingStateDeclarationBuilder<>(name, typeInformation, reduceFunction).build(); + } + + /** + * Get the {@link MapStateDeclaration} of map state with {@link + * StateDeclaration.RedistributionMode#NONE}. If you want to configure it more elaborately, use + * {@link StateDeclarations#mapStateBuilder(String, TypeDescriptor, TypeDescriptor)}. + */ + public static <K, V> MapStateDeclaration<K, V> mapState( + String name, + TypeDescriptor<K> keyTypeInformation, + TypeDescriptor<V> valueTypeInformation) { + return new MapStateDeclarationBuilder<>(name, keyTypeInformation, valueTypeInformation) + .build(); + } + + /** + * Get the {@link ListStateDeclaration} of list state with {@link + * StateDeclaration.RedistributionMode#NONE}. If you want to configure it more elaborately, use + * {@link StateDeclarations#listStateBuilder(String, TypeDescriptor)}. + */ + public static <T> ListStateDeclaration<T> listState( + String name, TypeDescriptor<T> elementTypeInformation) { + return new ListStateDeclarationBuilder<>(name, elementTypeInformation).build(); + } + + /** + * Get the {@link ValueStateDeclaration} of value state. If you want to configure it more + * elaborately, use {@link StateDeclarations#valueStateBuilder(String, TypeDescriptor)}. + */ + public static <T> ValueStateDeclaration<T> valueState( + String name, TypeDescriptor<T> valueType) { + return new ValueStateDeclarationBuilder<>(name, valueType).build(); + } + + /** Builder for {@link ReducingStateDeclaration}. */ + @Experimental + public static class ReducingStateDeclarationBuilder<T> implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + private final TypeDescriptor<T> typeInformation; + + private final ReduceFunction<T> reduceFunction; + + public ReducingStateDeclarationBuilder( + String name, TypeDescriptor<T> typeInformation, ReduceFunction<T> reduceFunction) { + this.name = name; + this.typeInformation = typeInformation; + this.reduceFunction = reduceFunction; + } + + ReducingStateDeclaration<T> build() { + return new ReducingStateDeclaration<T>() { + @Override + public TypeDescriptor<T> getTypeDescriptor() { + return typeInformation; + } + + @Override + public String getName() { + return name; + } + + @Override + public ReduceFunction<T> getReduceFunction() { + return reduceFunction; + } + + @Override + public RedistributionMode getRedistributionMode() { + return RedistributionMode.NONE; + } + }; + } + } + + /** Builder for {@link AggregatingStateDeclaration}. */ + @Experimental + public static class AggregatingStateDeclarationBuilder<IN, OUT, ACC> implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + + private final TypeDescriptor<ACC> stateTypeDescriptor; + private final AggregateFunction<IN, ACC, OUT> aggregateFunction; + + public AggregatingStateDeclarationBuilder( + String name, + TypeDescriptor<ACC> stateTypeDescriptor, + AggregateFunction<IN, ACC, OUT> aggregateFunction) { + this.name = name; + this.stateTypeDescriptor = stateTypeDescriptor; + this.aggregateFunction = aggregateFunction; + } + + AggregatingStateDeclaration<IN, ACC, OUT> build() { + return new AggregatingStateDeclaration<IN, ACC, OUT>() { + @Override + public TypeDescriptor<ACC> getTypeDescriptor() { + return stateTypeDescriptor; + } + + @Override + public AggregateFunction<IN, ACC, OUT> getAggregateFunction() { + return aggregateFunction; + } + + @Override + public String getName() { + return name; + } + + @Override + public RedistributionMode getRedistributionMode() { + return RedistributionMode.NONE; + } + }; + } + } + + /** Builder for {@link MapStateDeclaration}. */ + @Experimental + public static class MapStateDeclarationBuilder<K, V> implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + private final TypeDescriptor<K> keyTypeInformation; + private final TypeDescriptor<V> valueTypeInformation; + + private final StateDeclaration.RedistributionMode redistributionMode; + + public MapStateDeclarationBuilder( + String name, + TypeDescriptor<K> keyTypeInformation, + TypeDescriptor<V> valueTypeInformation) { + this( + name, + keyTypeInformation, + valueTypeInformation, + StateDeclaration.RedistributionMode.NONE); + } + + public MapStateDeclarationBuilder( + String name, + TypeDescriptor<K> keyTypeInformation, + TypeDescriptor<V> valueTypeInformation, + StateDeclaration.RedistributionMode redistributionMode) { + this.name = name; + this.keyTypeInformation = keyTypeInformation; + this.valueTypeInformation = valueTypeInformation; + this.redistributionMode = redistributionMode; + } + + public BroadcastStateDeclaration<K, V> buildBroadcast() { + + return new BroadcastStateDeclaration<K, V>() { + @Override + public TypeDescriptor<K> getKeyTypeDescriptor() { + return keyTypeInformation; + } + + @Override + public TypeDescriptor<V> getValueTypeDescriptor() { + return valueTypeInformation; + } + + @Override + public String getName() { + return name; + } + + @Override + public RedistributionMode getRedistributionMode() { + return StateDeclaration.RedistributionMode.IDENTICAL; + } + }; + } + + MapStateDeclaration<K, V> build() { + return new MapStateDeclaration<K, V>() { + @Override + public TypeDescriptor<K> getKeyTypeDescriptor() { + return keyTypeInformation; + } + + @Override + public TypeDescriptor<V> getValueTypeDescriptor() { + return valueTypeInformation; + } + + @Override + public String getName() { + return name; + } + + @Override + public RedistributionMode getRedistributionMode() { + return redistributionMode; + } + }; + } + } + + /** Builder for {@link ListStateDeclaration}. */ + @Experimental + public static class ListStateDeclarationBuilder<T> implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + private final TypeDescriptor<T> elementTypeInformation; + private ListStateDeclaration.RedistributionStrategy redistributionStrategy = + ListStateDeclaration.RedistributionStrategy.SPLIT; + private StateDeclaration.RedistributionMode redistributionMode = + StateDeclaration.RedistributionMode.NONE; + + public ListStateDeclarationBuilder(String name, TypeDescriptor<T> elementTypeInformation) { + this.name = name; + this.elementTypeInformation = elementTypeInformation; + } + + public ListStateDeclarationBuilder<T> redistributeBy( + ListStateDeclaration.RedistributionStrategy strategy) { + this.redistributionStrategy = strategy; + this.redistributionMode = StateDeclaration.RedistributionMode.REDISTRIBUTABLE; + return this; + } + + public ListStateDeclarationBuilder<T> redistributeWithMode( + StateDeclaration.RedistributionMode mode) { + this.redistributionMode = mode; + return this; + } + + public ListStateDeclaration<T> build() { + + return new ListStateDeclaration<T>() { + @Override + public RedistributionStrategy getRedistributionStrategy() { + return redistributionStrategy; + } + + @Override + public TypeDescriptor<T> getTypeDescriptor() { + return elementTypeInformation; + } + + @Override + public String getName() { + return name; + } + + @Override + public RedistributionMode getRedistributionMode() { + return redistributionMode; + } + }; + } + } + + /** Builder for {@link ValueStateDeclaration}. */ + @Experimental + public static class ValueStateDeclarationBuilder<T> implements Serializable { + private static final long serialVersionUID = 1L; + + private final String name; + private final TypeDescriptor<T> valueType; + + public ValueStateDeclarationBuilder(String name, TypeDescriptor<T> valueType) { + this.name = name; + this.valueType = valueType; + } + + ValueStateDeclaration<T> build() { + return new ValueStateDeclaration<T>() { + @Override + public TypeDescriptor<T> getTypeDescriptor() { + return valueType; + } + + @Override + public String getName() { + return name; + } + + @Override + public RedistributionMode getRedistributionMode() { + return RedistributionMode.NONE; + } + }; + } + } +} diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/state/ValueStateDeclaration.java b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ValueStateDeclaration.java new file mode 100644 index 00000000000..f0e78510f4d --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ValueStateDeclaration.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; + +/** This represents a declaration of the value state. */ +@Experimental +public interface ValueStateDeclaration<T> extends StateDeclaration { + /** Get type descriptor of this value state. */ + TypeDescriptor<T> getTypeDescriptor(); +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDeclarationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDeclarationTest.java new file mode 100644 index 00000000000..69420e5faff --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDeclarationTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.functions.AggregateFunction; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link AggregatingStateDeclaration}. */ +class AggregatingStateDeclarationTest { + + private AggregatingStateDeclaration<Integer, Integer, Integer> aggregatingStateDeclaration; + private AggregateFunction<Integer, Integer, Integer> aggregateFunction; + + @BeforeEach + void setUp() { + aggregateFunction = + new AggregateFunction<Integer, Integer, Integer>() { + @Override + public Integer createAccumulator() { + return 0; + } + + @Override + public Integer add(Integer value, Integer accumulator) { + return 0; + } + + @Override + public Integer getResult(Integer accumulator) { + return 0; + } + + @Override + public Integer merge(Integer a, Integer b) { + return 0; + } + }; + aggregatingStateDeclaration = + StateDeclarations.aggregatingStateBuilder( + "aggregatingState", TypeDescriptors.INT, aggregateFunction) + .build(); + } + + @Test + void testAggregatingStateDeclarationName() { + assertThat(aggregatingStateDeclaration.getName()).isEqualTo("aggregatingState"); + } + + @Test + void testAggregatingStateDeclarationFunc() { + assertThat(aggregatingStateDeclaration.getAggregateFunction()).isEqualTo(aggregateFunction); + } + + @Test + void testAggregatingStateDeclarationType() { + assertThat(aggregatingStateDeclaration.getTypeDescriptor()).isEqualTo(TypeDescriptors.INT); + } + + @Test + void testAggregatingStateDeclarationDist() { + assertThat(aggregatingStateDeclaration.getRedistributionMode()) + .isEqualTo(StateDeclaration.RedistributionMode.NONE); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDeclarationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDeclarationTest.java new file mode 100644 index 00000000000..bb2b385afcd --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDeclarationTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.typeinfo.TypeDescriptors; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link ListStateDeclaration}. */ +class ListStateDeclarationTest { + + @Test + void testListStateDeclarationName() { + ListStateDeclaration<Integer> listStateDeclaration = + StateDeclarations.listStateBuilder("listState", TypeDescriptors.INT).build(); + assertThat(listStateDeclaration.getName()).isEqualTo("listState"); + } + + @Test + void testListStateDeclarationDistribution() { + ListStateDeclaration<Integer> listStateDefault = + StateDeclarations.listStateBuilder("listState", TypeDescriptors.INT).build(); + assertThat(listStateDefault.getRedistributionStrategy()) + .isEqualTo(ListStateDeclaration.RedistributionStrategy.SPLIT); + assertThat(listStateDefault.getRedistributionMode()) + .isEqualTo(StateDeclaration.RedistributionMode.NONE); + + ListStateDeclaration<Integer> listStateCustomized = + StateDeclarations.listStateBuilder("listState", TypeDescriptors.INT) + .redistributeBy(ListStateDeclaration.RedistributionStrategy.UNION) + .redistributeWithMode(StateDeclaration.RedistributionMode.REDISTRIBUTABLE) + .build(); + assertThat(listStateCustomized.getRedistributionStrategy()) + .isEqualTo(ListStateDeclaration.RedistributionStrategy.UNION); + assertThat(listStateCustomized.getRedistributionMode()) + .isEqualTo(StateDeclaration.RedistributionMode.REDISTRIBUTABLE); + } + + @Test + void testListStateDeclarationType() { + ListStateDeclaration<Integer> listStateDeclaration = + StateDeclarations.listStateBuilder("listState", TypeDescriptors.INT).build(); + assertThat(listStateDeclaration.getTypeDescriptor()).isEqualTo(TypeDescriptors.INT); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDeclarationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDeclarationTest.java new file mode 100644 index 00000000000..40dd68052fe --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDeclarationTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.typeinfo.TypeDescriptors; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link MapStateDeclaration} and {@link BroadcastStateDeclaration}. */ +class MapStateDeclarationTest { + + @Test + void testMapStateDeclarationName() { + MapStateDeclaration<Integer, String> mapStateDeclaration = + StateDeclarations.mapStateBuilder( + "mapState", TypeDescriptors.INT, TypeDescriptors.STRING) + .build(); + assertThat(mapStateDeclaration.getName()).isEqualTo("mapState"); + } + + @Test + void testMapStateDeclarationType() { + MapStateDeclaration<Integer, String> mapStateDeclaration = + StateDeclarations.mapStateBuilder( + "mapState", TypeDescriptors.INT, TypeDescriptors.STRING) + .build(); + assertThat(mapStateDeclaration.getKeyTypeDescriptor()).isEqualTo(TypeDescriptors.INT); + assertThat(mapStateDeclaration.getValueTypeDescriptor()).isEqualTo(TypeDescriptors.STRING); + } + + @Test + void testMapStateDeclarationRedistribution() { + MapStateDeclaration<Integer, String> mapStateDeclaration = + StateDeclarations.mapStateBuilder( + "mapState", TypeDescriptors.INT, TypeDescriptors.STRING) + .build(); + assertThat(mapStateDeclaration.getRedistributionMode()) + .isEqualTo(StateDeclaration.RedistributionMode.NONE); + } + + @Test + void testBroadcastStateDeclaration() { + BroadcastStateDeclaration<Integer, String> broadcastState = + StateDeclarations.mapStateBuilder( + "broadcastState", TypeDescriptors.INT, TypeDescriptors.STRING) + .buildBroadcast(); + assertThat(broadcastState.getRedistributionMode()) + .isEqualTo(StateDeclaration.RedistributionMode.IDENTICAL); + assertThat(broadcastState.getName()).isEqualTo("broadcastState"); + assertThat(broadcastState.getKeyTypeDescriptor()).isEqualTo(TypeDescriptors.INT); + assertThat(broadcastState.getValueTypeDescriptor()).isEqualTo(TypeDescriptors.STRING); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDeclarationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDeclarationTest.java new file mode 100644 index 00000000000..abc510389ed --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDeclarationTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link ReducingStateDeclaration}. */ +class ReducingStateDeclarationTest { + + @Test + void testReducingStateDeclarationName() { + ReducingStateDeclaration<Integer> reducingStateDeclaration = + StateDeclarations.reducingStateBuilder( + "reducingState", + TypeDescriptors.INT, + (ReduceFunction<Integer>) Integer::sum) + .build(); + assertThat(reducingStateDeclaration.getName()).isEqualTo("reducingState"); + } + + @Test + void testReducingStateDeclarationType() { + ReducingStateDeclaration<Integer> reducingStateDeclaration = + StateDeclarations.reducingStateBuilder( + "reducingState", + TypeDescriptors.INT, + (ReduceFunction<Integer>) Integer::sum) + .build(); + assertThat(reducingStateDeclaration.getTypeDescriptor()).isEqualTo(TypeDescriptors.INT); + } + + @Test + void testReducingStateDeclarationRedistribution() { + ReducingStateDeclaration<Integer> reducingStateDeclaration = + StateDeclarations.reducingStateBuilder( + "reducingState", + TypeDescriptors.INT, + (ReduceFunction<Integer>) Integer::sum) + .build(); + assertThat(reducingStateDeclaration.getRedistributionMode()) + .isEqualTo(StateDeclaration.RedistributionMode.NONE); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDeclarationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDeclarationTest.java new file mode 100644 index 00000000000..2235596e41f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDeclarationTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.state; + +import org.apache.flink.api.common.typeinfo.TypeDescriptor; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; +import org.apache.flink.types.BooleanValue; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for the {@link ValueStateDeclaration}. */ +class ValueStateDeclarationTest { + + private static TypeDescriptor<BooleanValue> booleanValueDescriptor; + private static ValueStateDeclaration<BooleanValue> valueStateDeclaration; + + @BeforeAll + static void setUp() throws ReflectiveOperationException { + booleanValueDescriptor = + TypeDescriptors.value((TypeDescriptor<BooleanValue>) () -> BooleanValue.class); + valueStateDeclaration = StateDeclarations.valueState("valueState", booleanValueDescriptor); + } + + @Test + void testValueStateDeclarationName() { + assertThat(valueStateDeclaration.getName()).isEqualTo("valueState"); + } + + @Test + void testValueStateDeclarationDistribution() { + assertThat(valueStateDeclaration.getRedistributionMode()) + .isEqualTo(StateDeclaration.RedistributionMode.NONE); + } + + @Test + void testValueStateDeclarationType() { + assertThat(valueStateDeclaration.getTypeDescriptor()).isEqualTo(booleanValueDescriptor); + } +}