reswqa commented on code in PR #24725:
URL: https://github.com/apache/flink/pull/24725#discussion_r1630562709


##########
flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/BasicTypeDescriptorTest.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.common;
+
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link 
org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl}. */
+class BasicTypeDescriptorTest {
+
+    @Test
+    void testBasicTypeDescriptorString() {
+        assertThat(TypeDescriptors.STRING_TYPE.toString())
+                .isEqualTo("BasicTypeDescriptorImpl [basicTypeInfo=String]");
+    }
+
+    @Test
+    void testBasicTypeDescriptorInt() {
+        assertThat(TypeDescriptors.INT_TYPE.toString())
+                .isEqualTo("BasicTypeDescriptorImpl [basicTypeInfo=Integer]");
+    }

Review Comment:
   Could we simplify these tests? There is only little difference between them. 
I think we can reduce them into one test case by looping or parameterizing them.
   
   Same goes for the other `XXXTypeDescriptorTest`.



##########
flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.utils.TypeUtils;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/** Descriptor interface to create TypeInformation instances. */
+@Experimental
+public class TypeDescriptors implements Serializable {
+
+    @SuppressWarnings("unchecked")
+    public static <K, V> TypeDescriptor<Map<K, V>> mapTypeDescriptor(
+            TypeDescriptor<K> keyTypeDescriptor, TypeDescriptor<V> 
valueTypeDescriptor)
+            throws ReflectiveOperationException {
+
+        return (TypeDescriptor<Map<K, V>>)
+                TypeUtils.getInstance(
+                        
"org.apache.flink.api.common.typeinfo.descriptor.MapTypeDescriptorImpl",
+                        keyTypeDescriptor,
+                        valueTypeDescriptor);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> TypeDescriptor<List<T>> 
listTypeDescriptor(TypeDescriptor<T> typeDescriptor)

Review Comment:
   ```suggestion
       public static <T> TypeDescriptor<List<T>> 
listTypeDescriptor(TypeDescriptor<T> elementTypeDescriptor)
   ```



##########
flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.utils.TypeUtils;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/** Descriptor interface to create TypeInformation instances. */
+@Experimental
+public class TypeDescriptors implements Serializable {
+
+    @SuppressWarnings("unchecked")
+    public static <K, V> TypeDescriptor<Map<K, V>> mapTypeDescriptor(

Review Comment:
   Can we simplify the name of these method & static field? For example 
`mapTypeDescriptor  -> map`, `STRING_TYPE  -> STRING`.  Since the name of the 
class is `TypeDescriptors`, I think the current name is a bit redundant. WDYT?



##########
flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclarations.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This is a helper class for declaring various states. */
+@Experimental
+public class StateDeclarations {
+
+    /** Get the builder of {@link AggregatingStateDeclaration}. */
+    public static <IN, OUT, ACC>
+            AggregatingStateDeclarationBuilder<IN, OUT, ACC> 
aggregatingStateBuilder(
+                    String name,
+                    TypeDescriptor<ACC> aggTypeDescriptor,
+                    AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+        return new AggregatingStateDeclarationBuilder<>(name, 
aggTypeDescriptor, aggregateFunction);
+    }
+
+    /** Get the builder of {@link ReducingStateDeclaration}. */
+    public static <T> ReducingStateDeclarationBuilder<T> reducingStateBuilder(
+            String name, TypeDescriptor<T> typeInformation, ReduceFunction<T> 
reduceFunction) {
+        return new ReducingStateDeclarationBuilder<>(name, typeInformation, 
reduceFunction);
+    }
+
+    /** Get the builder of {@link MapStateDeclaration}. */
+    public static <K, V> MapStateDeclarationBuilder<K, V> mapStateBuilder(
+            String name,
+            TypeDescriptor<K> keyTypeInformation,
+            TypeDescriptor<V> valueTypeInformation) {
+        return new MapStateDeclarationBuilder<>(name, keyTypeInformation, 
valueTypeInformation);
+    }
+
+    /** Get the builder of {@link ListStateDeclaration}. */
+    public static <T> ListStateDeclarationBuilder<T> listStateBuilder(
+            String name, TypeDescriptor<T> elementTypeInformation) {
+        return new ListStateDeclarationBuilder<>(name, elementTypeInformation);
+    }
+
+    /** Get the builder of {@link ValueStateDeclaration}. */
+    public <T> ValueStateDeclarationBuilder<T> valueStateBuilder(
+            String name, TypeDescriptor<T> valueType) {
+        return new ValueStateDeclarationBuilder<>(name, valueType);
+    }
+
+    /**
+     * Get the {@link AggregatingStateDeclaration} of aggregating state. If 
you want to configure it
+     * more elaborately, use {@link #aggregatingStateBuilder(String, 
TypeDescriptor,
+     * AggregateFunction)}.
+     */
+    public static <IN, ACC, OUT> AggregatingStateDeclaration<IN, ACC, OUT> 
aggregatingState(
+            String name,
+            TypeDescriptor<ACC> aggTypeDescriptor,
+            AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+        return new AggregatingStateDeclarationBuilder<>(name, 
aggTypeDescriptor, aggregateFunction)
+                .build();
+    }
+
+    /**
+     * Get the {@link ReducingStateDeclaration} of list state. If you want to 
configure it more
+     * elaborately, use {@link StateDeclarations#reducingStateBuilder(String, 
TypeDescriptor,
+     * ReduceFunction)}.
+     */
+    public static <T> ReducingStateDeclaration<T> reducingState(
+            String name, TypeDescriptor<T> typeInformation, ReduceFunction<T> 
reduceFunction) {
+        return new ReducingStateDeclarationBuilder<>(name, typeInformation, 
reduceFunction).build();
+    }
+
+    /**
+     * Get the {@link MapStateDeclaration} of map state with {@link
+     * StateDeclaration.RedistributionMode#NONE}. If you want to configure it 
more elaborately, use
+     * {@link StateDeclarations#mapStateBuilder(String, TypeDescriptor, 
TypeDescriptor)}.
+     */
+    public static <K, V> MapStateDeclaration<K, V> mapState(
+            String name,
+            TypeDescriptor<K> keyTypeInformation,
+            TypeDescriptor<V> valueTypeInformation) {
+        return new MapStateDeclarationBuilder<>(name, keyTypeInformation, 
valueTypeInformation)
+                .build();
+    }
+
+    /**
+     * Get the {@link ListStateDeclaration} of list state with {@link
+     * StateDeclaration.RedistributionMode#NONE}. If you want to configure it 
more elaborately, use
+     * {@link StateDeclarations#listStateBuilder(String, TypeDescriptor)}.
+     */
+    public static <T> ListStateDeclaration<T> listState(
+            String name, TypeDescriptor<T> elementTypeInformation) {
+        return new ListStateDeclarationBuilder<>(name, 
elementTypeInformation).build();
+    }
+
+    /**
+     * Get the {@link ValueStateDeclaration} of value state. If you want to 
configure it more
+     * elaborately, use {@link StateDeclarations#valueStateBuilder(String, 
TypeDescriptor)}.
+     */
+    public static <T> ValueStateDeclaration<T> valueState(
+            String name, TypeDescriptor<T> valueType) {
+        return new ValueStateDeclarationBuilder<>(name, valueType).build();
+    }
+
+    /** Builder for {@link ReducingStateDeclaration}. */
+    @Experimental
+    public static class ReducingStateDeclarationBuilder<T> {
+
+        private final String name;
+        private final TypeDescriptor<T> typeInformation;
+
+        private final ReduceFunction<T> reduceFunction;
+
+        public ReducingStateDeclarationBuilder(
+                String name, TypeDescriptor<T> typeInformation, 
ReduceFunction<T> reduceFunction) {
+            this.name = name;
+            this.typeInformation = typeInformation;
+            this.reduceFunction = reduceFunction;
+        }
+
+        ReducingStateDeclaration<T> build() {
+            return new ReducingStateDeclaration<T>() {
+                @Override
+                public TypeDescriptor<T> getTypeInformation() {
+                    return typeInformation;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public ReduceFunction<T> reduceFunction() {
+                    return reduceFunction;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return RedistributionMode.NONE;
+                }
+            };
+        }
+    }
+
+    /** Builder for {@link AggregatingStateDeclaration}. */
+    @Experimental
+    public static class AggregatingStateDeclarationBuilder<IN, OUT, ACC> {
+
+        private final String name;
+
+        private final TypeDescriptor<ACC> stateTypeDescriptor;
+        private final AggregateFunction<IN, ACC, OUT> aggregateFunction;
+
+        public AggregatingStateDeclarationBuilder(
+                String name,
+                TypeDescriptor<ACC> stateTypeDescriptor,
+                AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+            this.name = name;
+            this.stateTypeDescriptor = stateTypeDescriptor;
+            this.aggregateFunction = aggregateFunction;
+        }
+
+        AggregatingStateDeclaration<IN, ACC, OUT> build() {
+            return new AggregatingStateDeclaration<IN, ACC, OUT>() {
+                @Override
+                public TypeDescriptor<ACC> getTypeInformation() {
+                    return stateTypeDescriptor;
+                }
+
+                @Override
+                public AggregateFunction<IN, ACC, OUT> getAggregateFunction() {
+                    return aggregateFunction;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return RedistributionMode.NONE;
+                }
+            };
+        }
+    }
+
+    /** Builder for {@link MapStateDeclaration}. */
+    @Experimental
+    public static class MapStateDeclarationBuilder<K, V> {
+
+        private final String name;
+        private final TypeDescriptor<K> keyTypeInformation;
+        private final TypeDescriptor<V> valueTypeInformation;
+
+        private final StateDeclaration.RedistributionMode redistributionMode;
+
+        public MapStateDeclarationBuilder(
+                String name,
+                TypeDescriptor<K> keyTypeInformation,
+                TypeDescriptor<V> valueTypeInformation) {
+            this(
+                    name,
+                    keyTypeInformation,
+                    valueTypeInformation,
+                    StateDeclaration.RedistributionMode.NONE);
+        }
+
+        public MapStateDeclarationBuilder(
+                String name,
+                TypeDescriptor<K> keyTypeInformation,
+                TypeDescriptor<V> valueTypeInformation,
+                StateDeclaration.RedistributionMode redistributionMode) {
+            this.name = name;
+            this.keyTypeInformation = keyTypeInformation;
+            this.valueTypeInformation = valueTypeInformation;
+            this.redistributionMode = redistributionMode;
+        }
+
+        public BroadcastStateDeclaration<K, V> broadcast() {

Review Comment:
   How about rename this to `buildBroadcast` as it return declaration instead 
of buider.



##########
flink-core-api/src/main/java/org/apache/flink/api/common/state/AggregatingStateDeclaration.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This represents a declaration of the aggregating state. */
+@Experimental
+public interface AggregatingStateDeclaration<IN, ACC, OUT> extends 
StateDeclaration {
+    TypeDescriptor<ACC> getTypeInformation();

Review Comment:
   java doc.



##########
flink-core-api/src/main/java/org/apache/flink/api/common/state/StateDeclarations.java:
##########
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This is a helper class for declaring various states. */
+@Experimental
+public class StateDeclarations {
+
+    /** Get the builder of {@link AggregatingStateDeclaration}. */
+    public static <IN, OUT, ACC>
+            AggregatingStateDeclarationBuilder<IN, OUT, ACC> 
aggregatingStateBuilder(
+                    String name,
+                    TypeDescriptor<ACC> aggTypeDescriptor,
+                    AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+        return new AggregatingStateDeclarationBuilder<>(name, 
aggTypeDescriptor, aggregateFunction);
+    }
+
+    /** Get the builder of {@link ReducingStateDeclaration}. */
+    public static <T> ReducingStateDeclarationBuilder<T> reducingStateBuilder(
+            String name, TypeDescriptor<T> typeInformation, ReduceFunction<T> 
reduceFunction) {
+        return new ReducingStateDeclarationBuilder<>(name, typeInformation, 
reduceFunction);
+    }
+
+    /** Get the builder of {@link MapStateDeclaration}. */
+    public static <K, V> MapStateDeclarationBuilder<K, V> mapStateBuilder(
+            String name,
+            TypeDescriptor<K> keyTypeInformation,
+            TypeDescriptor<V> valueTypeInformation) {
+        return new MapStateDeclarationBuilder<>(name, keyTypeInformation, 
valueTypeInformation);
+    }
+
+    /** Get the builder of {@link ListStateDeclaration}. */
+    public static <T> ListStateDeclarationBuilder<T> listStateBuilder(
+            String name, TypeDescriptor<T> elementTypeInformation) {
+        return new ListStateDeclarationBuilder<>(name, elementTypeInformation);
+    }
+
+    /** Get the builder of {@link ValueStateDeclaration}. */
+    public <T> ValueStateDeclarationBuilder<T> valueStateBuilder(
+            String name, TypeDescriptor<T> valueType) {
+        return new ValueStateDeclarationBuilder<>(name, valueType);
+    }
+
+    /**
+     * Get the {@link AggregatingStateDeclaration} of aggregating state. If 
you want to configure it
+     * more elaborately, use {@link #aggregatingStateBuilder(String, 
TypeDescriptor,
+     * AggregateFunction)}.
+     */
+    public static <IN, ACC, OUT> AggregatingStateDeclaration<IN, ACC, OUT> 
aggregatingState(
+            String name,
+            TypeDescriptor<ACC> aggTypeDescriptor,
+            AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+        return new AggregatingStateDeclarationBuilder<>(name, 
aggTypeDescriptor, aggregateFunction)
+                .build();
+    }
+
+    /**
+     * Get the {@link ReducingStateDeclaration} of list state. If you want to 
configure it more
+     * elaborately, use {@link StateDeclarations#reducingStateBuilder(String, 
TypeDescriptor,
+     * ReduceFunction)}.
+     */
+    public static <T> ReducingStateDeclaration<T> reducingState(
+            String name, TypeDescriptor<T> typeInformation, ReduceFunction<T> 
reduceFunction) {
+        return new ReducingStateDeclarationBuilder<>(name, typeInformation, 
reduceFunction).build();
+    }
+
+    /**
+     * Get the {@link MapStateDeclaration} of map state with {@link
+     * StateDeclaration.RedistributionMode#NONE}. If you want to configure it 
more elaborately, use
+     * {@link StateDeclarations#mapStateBuilder(String, TypeDescriptor, 
TypeDescriptor)}.
+     */
+    public static <K, V> MapStateDeclaration<K, V> mapState(
+            String name,
+            TypeDescriptor<K> keyTypeInformation,
+            TypeDescriptor<V> valueTypeInformation) {
+        return new MapStateDeclarationBuilder<>(name, keyTypeInformation, 
valueTypeInformation)
+                .build();
+    }
+
+    /**
+     * Get the {@link ListStateDeclaration} of list state with {@link
+     * StateDeclaration.RedistributionMode#NONE}. If you want to configure it 
more elaborately, use
+     * {@link StateDeclarations#listStateBuilder(String, TypeDescriptor)}.
+     */
+    public static <T> ListStateDeclaration<T> listState(
+            String name, TypeDescriptor<T> elementTypeInformation) {
+        return new ListStateDeclarationBuilder<>(name, 
elementTypeInformation).build();
+    }
+
+    /**
+     * Get the {@link ValueStateDeclaration} of value state. If you want to 
configure it more
+     * elaborately, use {@link StateDeclarations#valueStateBuilder(String, 
TypeDescriptor)}.
+     */
+    public static <T> ValueStateDeclaration<T> valueState(
+            String name, TypeDescriptor<T> valueType) {
+        return new ValueStateDeclarationBuilder<>(name, valueType).build();
+    }
+
+    /** Builder for {@link ReducingStateDeclaration}. */
+    @Experimental
+    public static class ReducingStateDeclarationBuilder<T> {
+
+        private final String name;
+        private final TypeDescriptor<T> typeInformation;
+
+        private final ReduceFunction<T> reduceFunction;
+
+        public ReducingStateDeclarationBuilder(
+                String name, TypeDescriptor<T> typeInformation, 
ReduceFunction<T> reduceFunction) {
+            this.name = name;
+            this.typeInformation = typeInformation;
+            this.reduceFunction = reduceFunction;
+        }
+
+        ReducingStateDeclaration<T> build() {
+            return new ReducingStateDeclaration<T>() {
+                @Override
+                public TypeDescriptor<T> getTypeInformation() {
+                    return typeInformation;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public ReduceFunction<T> reduceFunction() {
+                    return reduceFunction;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return RedistributionMode.NONE;
+                }
+            };
+        }
+    }
+
+    /** Builder for {@link AggregatingStateDeclaration}. */
+    @Experimental
+    public static class AggregatingStateDeclarationBuilder<IN, OUT, ACC> {
+
+        private final String name;
+
+        private final TypeDescriptor<ACC> stateTypeDescriptor;
+        private final AggregateFunction<IN, ACC, OUT> aggregateFunction;
+
+        public AggregatingStateDeclarationBuilder(
+                String name,
+                TypeDescriptor<ACC> stateTypeDescriptor,
+                AggregateFunction<IN, ACC, OUT> aggregateFunction) {
+            this.name = name;
+            this.stateTypeDescriptor = stateTypeDescriptor;
+            this.aggregateFunction = aggregateFunction;
+        }
+
+        AggregatingStateDeclaration<IN, ACC, OUT> build() {
+            return new AggregatingStateDeclaration<IN, ACC, OUT>() {
+                @Override
+                public TypeDescriptor<ACC> getTypeInformation() {
+                    return stateTypeDescriptor;
+                }
+
+                @Override
+                public AggregateFunction<IN, ACC, OUT> getAggregateFunction() {
+                    return aggregateFunction;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return RedistributionMode.NONE;
+                }
+            };
+        }
+    }
+
+    /** Builder for {@link MapStateDeclaration}. */
+    @Experimental
+    public static class MapStateDeclarationBuilder<K, V> {
+
+        private final String name;
+        private final TypeDescriptor<K> keyTypeInformation;
+        private final TypeDescriptor<V> valueTypeInformation;
+
+        private final StateDeclaration.RedistributionMode redistributionMode;
+
+        public MapStateDeclarationBuilder(
+                String name,
+                TypeDescriptor<K> keyTypeInformation,
+                TypeDescriptor<V> valueTypeInformation) {
+            this(
+                    name,
+                    keyTypeInformation,
+                    valueTypeInformation,
+                    StateDeclaration.RedistributionMode.NONE);
+        }
+
+        public MapStateDeclarationBuilder(
+                String name,
+                TypeDescriptor<K> keyTypeInformation,
+                TypeDescriptor<V> valueTypeInformation,
+                StateDeclaration.RedistributionMode redistributionMode) {
+            this.name = name;
+            this.keyTypeInformation = keyTypeInformation;
+            this.valueTypeInformation = valueTypeInformation;
+            this.redistributionMode = redistributionMode;
+        }
+
+        public BroadcastStateDeclaration<K, V> broadcast() {
+
+            return new BroadcastStateDeclaration<K, V>() {
+                @Override
+                public TypeDescriptor<K> getKeyTypeInformation() {
+                    return keyTypeInformation;
+                }
+
+                @Override
+                public TypeDescriptor<V> getValueTypeInformation() {
+                    return valueTypeInformation;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return StateDeclaration.RedistributionMode.IDENTICAL;
+                }
+            };
+        }
+
+        MapStateDeclaration<K, V> build() {
+            return new MapStateDeclaration<K, V>() {
+                @Override
+                public TypeDescriptor<K> getKeyTypeInformation() {
+                    return keyTypeInformation;
+                }
+
+                @Override
+                public TypeDescriptor<V> getValueTypeInformation() {
+                    return valueTypeInformation;
+                }
+
+                @Override
+                public String getName() {
+                    return name;
+                }
+
+                @Override
+                public RedistributionMode getRedistributionMode() {
+                    return redistributionMode;
+                }
+            };
+        }
+    }
+
+    /** Builder for {@link ListStateDeclaration}. */
+    @Experimental
+    public static class ListStateDeclarationBuilder<T> {
+
+        private final String name;
+        private final TypeDescriptor<T> elementTypeInformation;
+        private ListStateDeclaration.RedistributionStrategy 
redistributionStrategy =
+                ListStateDeclaration.RedistributionStrategy.SPLIT;
+        private StateDeclaration.RedistributionMode redistributionMode =
+                StateDeclaration.RedistributionMode.NONE;
+
+        public ListStateDeclarationBuilder(String name, TypeDescriptor<T> 
elementTypeInformation) {
+            this.name = name;
+            this.elementTypeInformation = elementTypeInformation;
+        }
+
+        public ListStateDeclarationBuilder<T> redistributeBy(
+                ListStateDeclaration.RedistributionStrategy strategy) {
+            this.redistributionStrategy = strategy;

Review Comment:
   @Should we also check `RedistributionMode` is `REDISTRIBUTABLE` if people 
want to set `RedistributionStrategy`?



##########
flink-core-api/src/main/java/org/apache/flink/api/common/state/ValueStateDeclaration.java:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This represents a declaration of the value state. */
+@Experimental
+public interface ValueStateDeclaration<T> extends StateDeclaration {
+    /** Get type information of this value state. */
+    TypeDescriptor<T> getTypeInformation();
+}

Review Comment:
   ```suggestion
   public interface ValueStateDeclaration<T> extends StateDeclaration {
       /** Get type descriptor of this value state. */
       TypeDescriptor<T> getTypeDescriptor();
   }
   ```
   
   The same applies to the other `XXXStateDeclaration`.
   
   I think we should also update the related part in FLIP-433.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java:
##########
@@ -49,6 +80,82 @@ public <K> K getCurrentKey() {
         return (K) currentKeySupplier.get();
     }
 
+    @Override
+    public <T> Optional<ValueState<T>> getState(ValueStateDeclaration<T> 
stateDeclaration)
+            throws Exception {
+        ValueStateDescriptor<T> valueStateDescriptor =
+                new ValueStateDescriptor<>(
+                        stateDeclaration.getName(),
+                        stateDeclaration.getTypeInformation().getTypeClass());
+        return Optional.of(operatorContext.getState(valueStateDescriptor));
+    }
+
+    @Override
+    public <T> Optional<ListState<T>> getState(ListStateDeclaration<T> 
stateDeclaration)
+            throws Exception {
+
+        ListStateDescriptor<T> listStateDescriptor =
+                new ListStateDescriptor<>(
+                        stateDeclaration.getName(),
+                        stateDeclaration.getTypeInformation().getTypeClass());
+
+        if (stateDeclaration.getRedistributionMode()
+                == StateDeclaration.RedistributionMode.IDENTICAL) {

Review Comment:
   Shouldn't this be `REDISTRIBUTABLE`? IIRC, List state doesn't support 
IDENTICAL mode(i.e. we don't have broadcast list state for now).



##########
flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/ProcessFunction.java:
##########
@@ -36,6 +40,16 @@ public interface ProcessFunction extends Function {
      */
     default void open() throws Exception {}
 
+    /**
+     * Explicitly declares states upfront.Each specific state must be declared 
in this method before

Review Comment:
   ```suggestion
        * Explicitly declares states upfront. Each specific state must be 
declared in this method before
   ```



##########
flink-core-api/src/main/java/org/apache/flink/api/common/state/ReducingStateDeclaration.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+
+/** This represents a declaration of the reducing state. */
+@Experimental
+public interface ReducingStateDeclaration<T> extends StateDeclaration {
+    /** Get type information of this reducing state. */
+    TypeDescriptor<T> getTypeInformation();
+
+    ReduceFunction<T> reduceFunction();

Review Comment:
   ```suggestion
       ReduceFunction<T> getReduceFunction();
   ```
   And we need java doc.



##########
flink-datastream/src/main/java/org/apache/flink/datastream/impl/context/DefaultStateManager.java:
##########
@@ -49,6 +80,82 @@ public <K> K getCurrentKey() {
         return (K) currentKeySupplier.get();
     }
 
+    @Override
+    public <T> Optional<ValueState<T>> getState(ValueStateDeclaration<T> 
stateDeclaration)
+            throws Exception {
+        ValueStateDescriptor<T> valueStateDescriptor =
+                new ValueStateDescriptor<>(
+                        stateDeclaration.getName(),
+                        stateDeclaration.getTypeInformation().getTypeClass());
+        return Optional.of(operatorContext.getState(valueStateDescriptor));

Review Comment:
   I think all `Optional.of` should be `Optional.ofNullable`, otherwise why 
this return type is an `Optional`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to