This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2998ef1aeb2c0b01694dacbe72bae0e6c5ca3631
Author: Jeyhun Karimov <je.kari...@gmail.com>
AuthorDate: Thu May 30 16:47:37 2024 +0200

    [FLINK-34977][API] Support State Declarations
---
 .../common/state/AggregatingStateDeclaration.java  |  33 ++
 .../common/state/BroadcastStateDeclaration.java    |  32 ++
 .../api/common/state/ListStateDeclaration.java     |  54 +++
 .../api/common/state/MapStateDeclaration.java      |  32 ++
 .../api/common/state/ReducingStateDeclaration.java |  33 ++
 .../flink/api/common/state/StateDeclaration.java   |  67 ++++
 .../flink/api/common/state/StateDeclarations.java  | 379 +++++++++++++++++++++
 .../api/common/state/ValueStateDeclaration.java    |  29 ++
 .../state/AggregatingStateDeclarationTest.java     |  85 +++++
 .../api/common/state/ListStateDeclarationTest.java |  63 ++++
 .../api/common/state/MapStateDeclarationTest.java  |  71 ++++
 .../common/state/ReducingStateDeclarationTest.java |  64 ++++
 .../common/state/ValueStateDeclarationTest.java    |  58 ++++
 13 files changed, 1000 insertions(+)

diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/AggregatingStateDeclaration.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/AggregatingStateDeclaration.java
new file mode 100644
index 00000000000..a9660c8840d
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/AggregatingStateDeclaration.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This represents a declaration of the aggregating state. */
+@Experimental
+public interface AggregatingStateDeclaration<IN, ACC, OUT> extends 
StateDeclaration {
+    /** Get type descriptor of this state. */
+    TypeDescriptor<ACC> getTypeDescriptor();
+
+    /** Get the aggregate function of this state. */
+    AggregateFunction<IN, ACC, OUT> getAggregateFunction();
+}
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/BroadcastStateDeclaration.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/BroadcastStateDeclaration.java
new file mode 100644
index 00000000000..1f432048a15
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/BroadcastStateDeclaration.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This represents a declaration of the broadcast state. */
+@Experimental
+public interface BroadcastStateDeclaration<K, V> extends StateDeclaration {
+    /** Get type descriptor of this broadcast state's key. */
+    TypeDescriptor<K> getKeyTypeDescriptor();
+
+    /** Get type descriptor of this broadcast state's value. */
+    TypeDescriptor<V> getValueTypeDescriptor();
+}
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java
new file mode 100644
index 00000000000..5593a7f57e9
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ListStateDeclaration.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This represents a declaration of the list state. */
+@Experimental
+public interface ListStateDeclaration<T> extends StateDeclaration {
+    /**
+     * Get the {@link RedistributionStrategy} of this list state.
+     *
+     * @return the redistribution strategy of this list state.
+     */
+    RedistributionStrategy getRedistributionStrategy();
+
+    /**
+     * {@link RedistributionStrategy} is used to guide the assignment of 
states during rescaling.
+     */
+    @Experimental
+    enum RedistributionStrategy {
+        /**
+         * The whole state is logically a concatenation of all lists. On 
restore/redistribution, the
+         * list is evenly divided into as many sub-lists as there are parallel 
operators. Each
+         * operator gets a sub-list, which can be empty, or contain one or 
more elements.
+         */
+        SPLIT,
+        /**
+         * The whole state is logically a concatenation of all lists. On 
restore/redistribution,
+         * each operator gets the complete list of state elements.
+         */
+        UNION
+    }
+
+    /** Get type descriptor of this list state's element. */
+    TypeDescriptor<T> getTypeDescriptor();
+}
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapStateDeclaration.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapStateDeclaration.java
new file mode 100644
index 00000000000..1e844153a0a
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/MapStateDeclaration.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This represents a declaration of the map state. */
+@Experimental
+public interface MapStateDeclaration<K, V> extends StateDeclaration {
+    /** Get type descriptor of this map state's key. */
+    TypeDescriptor<K> getKeyTypeDescriptor();
+
+    /** Get type descriptor of this map state's value. */
+    TypeDescriptor<V> getValueTypeDescriptor();
+}
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/ReducingStateDeclaration.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ReducingStateDeclaration.java
new file mode 100644
index 00000000000..7bbcb3ddac5
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ReducingStateDeclaration.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This represents a declaration of the reducing state. */
+@Experimental
+public interface ReducingStateDeclaration<T> extends StateDeclaration {
+    /** Get type descriptor of this reducing state. */
+    TypeDescriptor<T> getTypeDescriptor();
+
+    /** Get {@link ReduceFunction} associated with this reducing state. */
+    ReduceFunction<T> getReduceFunction();
+}
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java
new file mode 100644
index 00000000000..b251032cd46
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclaration.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+
+/** {@link StateDeclaration} represents a declaration of the specific state 
used. */
+@Experimental
+public interface StateDeclaration extends Serializable {
+
+    /** Get the name of this state. */
+    String getName();
+
+    /**
+     * Get the {@link RedistributionMode} of this state. More details see the 
doc of {@link
+     * RedistributionMode}.
+     */
+    RedistributionMode getRedistributionMode();
+
+    /**
+     * {@link RedistributionMode} is used to indicate whether this state 
supports redistribution
+     * between partitions and how to redistribute this state during rescaling.
+     */
+    @Experimental
+    enum RedistributionMode {
+        /**
+         * Not supports redistribution.
+         *
+         * <p>For example : KeyedState is bind to a specific keyGroup, so it 
is can't support
+         * redistribution between partitions.
+         */
+        NONE,
+
+        /**
+         * This state can be safely redistributed between different 
partitions, and the specific
+         * redistribution strategy is determined by the state itself.
+         *
+         * <p>For example: ListState's redistribution algorithm is determined 
by {@link
+         * ListStateDeclaration.RedistributionStrategy}.
+         */
+        REDISTRIBUTABLE,
+
+        /**
+         * States are guranteed to be identical in different partitions, thus 
redistribution is not
+         * a problem.
+         */
+        IDENTICAL
+    }
+}
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclarations.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclarations.java
new file mode 100644
index 00000000000..bb1ab6e87fc
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclarations.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+import java.io.Serializable;
+
+/** This is a helper class for declaring various states. */
+@Experimental
+public class StateDeclarations {
+
+    /** Get the builder of {@link AggregatingStateDeclaration}. */
+    public static <IN, OUT, ACC>
+            AggregatingStateDeclarationBuilder<IN, OUT, ACC> 
aggregatingStateBuilder(
+                    String name,
+                    TypeDescriptor<ACC> aggTypeDescriptor,
+                    AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+        return new AggregatingStateDeclarationBuilder<>(name, 
aggTypeDescriptor, aggregateFunction);
+    }
+
+    /** Get the builder of {@link ReducingStateDeclaration}. */
+    public static <T> ReducingStateDeclarationBuilder<T> reducingStateBuilder(
+            String name, TypeDescriptor<T> typeInformation, ReduceFunction<T> 
reduceFunction) {
+        return new ReducingStateDeclarationBuilder<>(name, typeInformation, 
reduceFunction);
+    }
+
+    /** Get the builder of {@link MapStateDeclaration}. */
+    public static <K, V> MapStateDeclarationBuilder<K, V> mapStateBuilder(
+            String name,
+            TypeDescriptor<K> keyTypeInformation,
+            TypeDescriptor<V> valueTypeInformation) {
+        return new MapStateDeclarationBuilder<>(name, keyTypeInformation, 
valueTypeInformation);
+    }
+
+    /** Get the builder of {@link ListStateDeclaration}. */
+    public static <T> ListStateDeclarationBuilder<T> listStateBuilder(
+            String name, TypeDescriptor<T> elementTypeInformation) {
+        return new ListStateDeclarationBuilder<>(name, elementTypeInformation);
+    }
+
+    /** Get the builder of {@link ValueStateDeclaration}. */
+    public <T> ValueStateDeclarationBuilder<T> valueStateBuilder(
+            String name, TypeDescriptor<T> valueType) {
+        return new ValueStateDeclarationBuilder<>(name, valueType);
+    }
+
+    /**
+     * Get the {@link AggregatingStateDeclaration} of aggregating state. If 
you want to configure it
+     * more elaborately, use {@link #aggregatingStateBuilder(String, 
TypeDescriptor,
+     * AggregateFunction)}.
+     */
+    public static <IN, ACC, OUT> AggregatingStateDeclaration<IN, ACC, OUT> 
aggregatingState(
+            String name,
+            TypeDescriptor<ACC> aggTypeDescriptor,
+            AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+        return new AggregatingStateDeclarationBuilder<>(name, 
aggTypeDescriptor, aggregateFunction)
+                .build();
+    }
+
+    /**
+     * Get the {@link ReducingStateDeclaration} of list state. If you want to 
configure it more
+     * elaborately, use {@link StateDeclarations#reducingStateBuilder(String, 
TypeDescriptor,
+     * ReduceFunction)}.
+     */
+    public static <T> ReducingStateDeclaration<T> reducingState(
+            String name, TypeDescriptor<T> typeInformation, ReduceFunction<T> 
reduceFunction) {
+        return new ReducingStateDeclarationBuilder<>(name, typeInformation, 
reduceFunction).build();
+    }
+
+    /**
+     * Get the {@link MapStateDeclaration} of map state with {@link
+     * StateDeclaration.RedistributionMode#NONE}. If you want to configure it 
more elaborately, use
+     * {@link StateDeclarations#mapStateBuilder(String, TypeDescriptor, 
TypeDescriptor)}.
+     */
+    public static <K, V> MapStateDeclaration<K, V> mapState(
+            String name,
+            TypeDescriptor<K> keyTypeInformation,
+            TypeDescriptor<V> valueTypeInformation) {
+        return new MapStateDeclarationBuilder<>(name, keyTypeInformation, 
valueTypeInformation)
+                .build();
+    }
+
+    /**
+     * Get the {@link ListStateDeclaration} of list state with {@link
+     * StateDeclaration.RedistributionMode#NONE}. If you want to configure it 
more elaborately, use
+     * {@link StateDeclarations#listStateBuilder(String, TypeDescriptor)}.
+     */
+    public static <T> ListStateDeclaration<T> listState(
+            String name, TypeDescriptor<T> elementTypeInformation) {
+        return new ListStateDeclarationBuilder<>(name, 
elementTypeInformation).build();
+    }
+
+    /**
+     * Get the {@link ValueStateDeclaration} of value state. If you want to 
configure it more
+     * elaborately, use {@link StateDeclarations#valueStateBuilder(String, 
TypeDescriptor)}.
+     */
+    public static <T> ValueStateDeclaration<T> valueState(
+            String name, TypeDescriptor<T> valueType) {
+        return new ValueStateDeclarationBuilder<>(name, valueType).build();
+    }
+
+    /** Builder for {@link ReducingStateDeclaration}. */
+    @Experimental
+    public static class ReducingStateDeclarationBuilder<T> implements 
Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String name;
+        private final TypeDescriptor<T> typeInformation;
+
+        private final ReduceFunction<T> reduceFunction;
+
+        public ReducingStateDeclarationBuilder(
+                String name, TypeDescriptor<T> typeInformation, 
ReduceFunction<T> reduceFunction) {
+            this.name = name;
+            this.typeInformation = typeInformation;
+            this.reduceFunction = reduceFunction;
+        }
+
+        ReducingStateDeclaration<T> build() {
+            return new ReducingStateDeclaration<T>() {
+                @Override
+                public TypeDescriptor<T> getTypeDescriptor() {
+                    return typeInformation;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public ReduceFunction<T> getReduceFunction() {
+                    return reduceFunction;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return RedistributionMode.NONE;
+                }
+            };
+        }
+    }
+
+    /** Builder for {@link AggregatingStateDeclaration}. */
+    @Experimental
+    public static class AggregatingStateDeclarationBuilder<IN, OUT, ACC> 
implements Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String name;
+
+        private final TypeDescriptor<ACC> stateTypeDescriptor;
+        private final AggregateFunction<IN, ACC, OUT> aggregateFunction;
+
+        public AggregatingStateDeclarationBuilder(
+                String name,
+                TypeDescriptor<ACC> stateTypeDescriptor,
+                AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+            this.name = name;
+            this.stateTypeDescriptor = stateTypeDescriptor;
+            this.aggregateFunction = aggregateFunction;
+        }
+
+        AggregatingStateDeclaration<IN, ACC, OUT> build() {
+            return new AggregatingStateDeclaration<IN, ACC, OUT>() {
+                @Override
+                public TypeDescriptor<ACC> getTypeDescriptor() {
+                    return stateTypeDescriptor;
+                }
+
+                @Override
+                public AggregateFunction<IN, ACC, OUT> getAggregateFunction() {
+                    return aggregateFunction;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return RedistributionMode.NONE;
+                }
+            };
+        }
+    }
+
+    /** Builder for {@link MapStateDeclaration}. */
+    @Experimental
+    public static class MapStateDeclarationBuilder<K, V> implements 
Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String name;
+        private final TypeDescriptor<K> keyTypeInformation;
+        private final TypeDescriptor<V> valueTypeInformation;
+
+        private final StateDeclaration.RedistributionMode redistributionMode;
+
+        public MapStateDeclarationBuilder(
+                String name,
+                TypeDescriptor<K> keyTypeInformation,
+                TypeDescriptor<V> valueTypeInformation) {
+            this(
+                    name,
+                    keyTypeInformation,
+                    valueTypeInformation,
+                    StateDeclaration.RedistributionMode.NONE);
+        }
+
+        public MapStateDeclarationBuilder(
+                String name,
+                TypeDescriptor<K> keyTypeInformation,
+                TypeDescriptor<V> valueTypeInformation,
+                StateDeclaration.RedistributionMode redistributionMode) {
+            this.name = name;
+            this.keyTypeInformation = keyTypeInformation;
+            this.valueTypeInformation = valueTypeInformation;
+            this.redistributionMode = redistributionMode;
+        }
+
+        public BroadcastStateDeclaration<K, V> buildBroadcast() {
+
+            return new BroadcastStateDeclaration<K, V>() {
+                @Override
+                public TypeDescriptor<K> getKeyTypeDescriptor() {
+                    return keyTypeInformation;
+                }
+
+                @Override
+                public TypeDescriptor<V> getValueTypeDescriptor() {
+                    return valueTypeInformation;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return StateDeclaration.RedistributionMode.IDENTICAL;
+                }
+            };
+        }
+
+        MapStateDeclaration<K, V> build() {
+            return new MapStateDeclaration<K, V>() {
+                @Override
+                public TypeDescriptor<K> getKeyTypeDescriptor() {
+                    return keyTypeInformation;
+                }
+
+                @Override
+                public TypeDescriptor<V> getValueTypeDescriptor() {
+                    return valueTypeInformation;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return redistributionMode;
+                }
+            };
+        }
+    }
+
+    /** Builder for {@link ListStateDeclaration}. */
+    @Experimental
+    public static class ListStateDeclarationBuilder<T> implements Serializable 
{
+        private static final long serialVersionUID = 1L;
+
+        private final String name;
+        private final TypeDescriptor<T> elementTypeInformation;
+        private ListStateDeclaration.RedistributionStrategy 
redistributionStrategy =
+                ListStateDeclaration.RedistributionStrategy.SPLIT;
+        private StateDeclaration.RedistributionMode redistributionMode =
+                StateDeclaration.RedistributionMode.NONE;
+
+        public ListStateDeclarationBuilder(String name, TypeDescriptor<T> 
elementTypeInformation) {
+            this.name = name;
+            this.elementTypeInformation = elementTypeInformation;
+        }
+
+        public ListStateDeclarationBuilder<T> redistributeBy(
+                ListStateDeclaration.RedistributionStrategy strategy) {
+            this.redistributionStrategy = strategy;
+            this.redistributionMode = 
StateDeclaration.RedistributionMode.REDISTRIBUTABLE;
+            return this;
+        }
+
+        public ListStateDeclarationBuilder<T> redistributeWithMode(
+                StateDeclaration.RedistributionMode mode) {
+            this.redistributionMode = mode;
+            return this;
+        }
+
+        public ListStateDeclaration<T> build() {
+
+            return new ListStateDeclaration<T>() {
+                @Override
+                public RedistributionStrategy getRedistributionStrategy() {
+                    return redistributionStrategy;
+                }
+
+                @Override
+                public TypeDescriptor<T> getTypeDescriptor() {
+                    return elementTypeInformation;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return redistributionMode;
+                }
+            };
+        }
+    }
+
+    /** Builder for {@link ValueStateDeclaration}. */
+    @Experimental
+    public static class ValueStateDeclarationBuilder<T> implements 
Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final String name;
+        private final TypeDescriptor<T> valueType;
+
+        public ValueStateDeclarationBuilder(String name, TypeDescriptor<T> 
valueType) {
+            this.name = name;
+            this.valueType = valueType;
+        }
+
+        ValueStateDeclaration<T> build() {
+            return new ValueStateDeclaration<T>() {
+                @Override
+                public TypeDescriptor<T> getTypeDescriptor() {
+                    return valueType;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return RedistributionMode.NONE;
+                }
+            };
+        }
+    }
+}
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/state/ValueStateDeclaration.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ValueStateDeclaration.java
new file mode 100644
index 00000000000..f0e78510f4d
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/state/ValueStateDeclaration.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This represents a declaration of the value state. */
+@Experimental
+public interface ValueStateDeclaration<T> extends StateDeclaration {
+    /** Get type descriptor of this value state. */
+    TypeDescriptor<T> getTypeDescriptor();
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDeclarationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDeclarationTest.java
new file mode 100644
index 00000000000..69420e5faff
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/AggregatingStateDeclarationTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link AggregatingStateDeclaration}. */
+class AggregatingStateDeclarationTest {
+
+    private AggregatingStateDeclaration<Integer, Integer, Integer> 
aggregatingStateDeclaration;
+    private AggregateFunction<Integer, Integer, Integer> aggregateFunction;
+
+    @BeforeEach
+    void setUp() {
+        aggregateFunction =
+                new AggregateFunction<Integer, Integer, Integer>() {
+                    @Override
+                    public Integer createAccumulator() {
+                        return 0;
+                    }
+
+                    @Override
+                    public Integer add(Integer value, Integer accumulator) {
+                        return 0;
+                    }
+
+                    @Override
+                    public Integer getResult(Integer accumulator) {
+                        return 0;
+                    }
+
+                    @Override
+                    public Integer merge(Integer a, Integer b) {
+                        return 0;
+                    }
+                };
+        aggregatingStateDeclaration =
+                StateDeclarations.aggregatingStateBuilder(
+                                "aggregatingState", TypeDescriptors.INT, 
aggregateFunction)
+                        .build();
+    }
+
+    @Test
+    void testAggregatingStateDeclarationName() {
+        
assertThat(aggregatingStateDeclaration.getName()).isEqualTo("aggregatingState");
+    }
+
+    @Test
+    void testAggregatingStateDeclarationFunc() {
+        
assertThat(aggregatingStateDeclaration.getAggregateFunction()).isEqualTo(aggregateFunction);
+    }
+
+    @Test
+    void testAggregatingStateDeclarationType() {
+        
assertThat(aggregatingStateDeclaration.getTypeDescriptor()).isEqualTo(TypeDescriptors.INT);
+    }
+
+    @Test
+    void testAggregatingStateDeclarationDist() {
+        assertThat(aggregatingStateDeclaration.getRedistributionMode())
+                .isEqualTo(StateDeclaration.RedistributionMode.NONE);
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDeclarationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDeclarationTest.java
new file mode 100644
index 00000000000..bb2b385afcd
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ListStateDeclarationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link ListStateDeclaration}. */
+class ListStateDeclarationTest {
+
+    @Test
+    void testListStateDeclarationName() {
+        ListStateDeclaration<Integer> listStateDeclaration =
+                StateDeclarations.listStateBuilder("listState", 
TypeDescriptors.INT).build();
+        assertThat(listStateDeclaration.getName()).isEqualTo("listState");
+    }
+
+    @Test
+    void testListStateDeclarationDistribution() {
+        ListStateDeclaration<Integer> listStateDefault =
+                StateDeclarations.listStateBuilder("listState", 
TypeDescriptors.INT).build();
+        assertThat(listStateDefault.getRedistributionStrategy())
+                .isEqualTo(ListStateDeclaration.RedistributionStrategy.SPLIT);
+        assertThat(listStateDefault.getRedistributionMode())
+                .isEqualTo(StateDeclaration.RedistributionMode.NONE);
+
+        ListStateDeclaration<Integer> listStateCustomized =
+                StateDeclarations.listStateBuilder("listState", 
TypeDescriptors.INT)
+                        
.redistributeBy(ListStateDeclaration.RedistributionStrategy.UNION)
+                        
.redistributeWithMode(StateDeclaration.RedistributionMode.REDISTRIBUTABLE)
+                        .build();
+        assertThat(listStateCustomized.getRedistributionStrategy())
+                .isEqualTo(ListStateDeclaration.RedistributionStrategy.UNION);
+        assertThat(listStateCustomized.getRedistributionMode())
+                
.isEqualTo(StateDeclaration.RedistributionMode.REDISTRIBUTABLE);
+    }
+
+    @Test
+    void testListStateDeclarationType() {
+        ListStateDeclaration<Integer> listStateDeclaration =
+                StateDeclarations.listStateBuilder("listState", 
TypeDescriptors.INT).build();
+        
assertThat(listStateDeclaration.getTypeDescriptor()).isEqualTo(TypeDescriptors.INT);
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDeclarationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDeclarationTest.java
new file mode 100644
index 00000000000..40dd68052fe
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/MapStateDeclarationTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link MapStateDeclaration} and {@link 
BroadcastStateDeclaration}. */
+class MapStateDeclarationTest {
+
+    @Test
+    void testMapStateDeclarationName() {
+        MapStateDeclaration<Integer, String> mapStateDeclaration =
+                StateDeclarations.mapStateBuilder(
+                                "mapState", TypeDescriptors.INT, 
TypeDescriptors.STRING)
+                        .build();
+        assertThat(mapStateDeclaration.getName()).isEqualTo("mapState");
+    }
+
+    @Test
+    void testMapStateDeclarationType() {
+        MapStateDeclaration<Integer, String> mapStateDeclaration =
+                StateDeclarations.mapStateBuilder(
+                                "mapState", TypeDescriptors.INT, 
TypeDescriptors.STRING)
+                        .build();
+        
assertThat(mapStateDeclaration.getKeyTypeDescriptor()).isEqualTo(TypeDescriptors.INT);
+        
assertThat(mapStateDeclaration.getValueTypeDescriptor()).isEqualTo(TypeDescriptors.STRING);
+    }
+
+    @Test
+    void testMapStateDeclarationRedistribution() {
+        MapStateDeclaration<Integer, String> mapStateDeclaration =
+                StateDeclarations.mapStateBuilder(
+                                "mapState", TypeDescriptors.INT, 
TypeDescriptors.STRING)
+                        .build();
+        assertThat(mapStateDeclaration.getRedistributionMode())
+                .isEqualTo(StateDeclaration.RedistributionMode.NONE);
+    }
+
+    @Test
+    void testBroadcastStateDeclaration() {
+        BroadcastStateDeclaration<Integer, String> broadcastState =
+                StateDeclarations.mapStateBuilder(
+                                "broadcastState", TypeDescriptors.INT, 
TypeDescriptors.STRING)
+                        .buildBroadcast();
+        assertThat(broadcastState.getRedistributionMode())
+                .isEqualTo(StateDeclaration.RedistributionMode.IDENTICAL);
+        assertThat(broadcastState.getName()).isEqualTo("broadcastState");
+        
assertThat(broadcastState.getKeyTypeDescriptor()).isEqualTo(TypeDescriptors.INT);
+        
assertThat(broadcastState.getValueTypeDescriptor()).isEqualTo(TypeDescriptors.STRING);
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDeclarationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDeclarationTest.java
new file mode 100644
index 00000000000..abc510389ed
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ReducingStateDeclarationTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link ReducingStateDeclaration}. */
+class ReducingStateDeclarationTest {
+
+    @Test
+    void testReducingStateDeclarationName() {
+        ReducingStateDeclaration<Integer> reducingStateDeclaration =
+                StateDeclarations.reducingStateBuilder(
+                                "reducingState",
+                                TypeDescriptors.INT,
+                                (ReduceFunction<Integer>) Integer::sum)
+                        .build();
+        
assertThat(reducingStateDeclaration.getName()).isEqualTo("reducingState");
+    }
+
+    @Test
+    void testReducingStateDeclarationType() {
+        ReducingStateDeclaration<Integer> reducingStateDeclaration =
+                StateDeclarations.reducingStateBuilder(
+                                "reducingState",
+                                TypeDescriptors.INT,
+                                (ReduceFunction<Integer>) Integer::sum)
+                        .build();
+        
assertThat(reducingStateDeclaration.getTypeDescriptor()).isEqualTo(TypeDescriptors.INT);
+    }
+
+    @Test
+    void testReducingStateDeclarationRedistribution() {
+        ReducingStateDeclaration<Integer> reducingStateDeclaration =
+                StateDeclarations.reducingStateBuilder(
+                                "reducingState",
+                                TypeDescriptors.INT,
+                                (ReduceFunction<Integer>) Integer::sum)
+                        .build();
+        assertThat(reducingStateDeclaration.getRedistributionMode())
+                .isEqualTo(StateDeclaration.RedistributionMode.NONE);
+    }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDeclarationTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDeclarationTest.java
new file mode 100644
index 00000000000..2235596e41f
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/state/ValueStateDeclarationTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.types.BooleanValue;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for the {@link ValueStateDeclaration}. */
+class ValueStateDeclarationTest {
+
+    private static TypeDescriptor<BooleanValue> booleanValueDescriptor;
+    private static ValueStateDeclaration<BooleanValue> valueStateDeclaration;
+
+    @BeforeAll
+    static void setUp() throws ReflectiveOperationException {
+        booleanValueDescriptor =
+                TypeDescriptors.value((TypeDescriptor<BooleanValue>) () -> 
BooleanValue.class);
+        valueStateDeclaration = StateDeclarations.valueState("valueState", 
booleanValueDescriptor);
+    }
+
+    @Test
+    void testValueStateDeclarationName() {
+        assertThat(valueStateDeclaration.getName()).isEqualTo("valueState");
+    }
+
+    @Test
+    void testValueStateDeclarationDistribution() {
+        assertThat(valueStateDeclaration.getRedistributionMode())
+                .isEqualTo(StateDeclaration.RedistributionMode.NONE);
+    }
+
+    @Test
+    void testValueStateDeclarationType() {
+        
assertThat(valueStateDeclaration.getTypeDescriptor()).isEqualTo(booleanValueDescriptor);
+    }
+}


Reply via email to