This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6c7c99bdec5be9c233515bdfad0473969004ed3
Author: Jeyhun Karimov <je.kari...@gmail.com>
AuthorDate: Thu May 30 14:06:55 2024 +0200

    [FLINK-34977][API] Introduce Type Descriptors
---
 .../flink/api/common/typeinfo/TypeDescriptors.java | 152 +++++++++++++++++++++
 .../descriptor/BasicTypeDescriptorImpl.java        |  54 ++++++++
 .../descriptor/ListTypeDescriptorImpl.java         |  64 +++++++++
 .../typeinfo/descriptor/MapTypeDescriptorImpl.java |  69 ++++++++++
 .../descriptor/ValueTypeDescriptorImpl.java        |  55 ++++++++
 .../impl/common/BasicTypeDescriptorTest.java       |  51 +++++++
 .../impl/common/ListTypeDescriptorTest.java        |  52 +++++++
 .../impl/common/MapTypeDescriptorTest.java         |  72 ++++++++++
 .../impl/common/ValueTypeDescriptorTest.java       |  91 ++++++++++++
 9 files changed, 660 insertions(+)

diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java
new file mode 100644
index 00000000000..dc1addb85dc
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.typeinfo.utils.TypeUtils;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/** Descriptor interface to create TypeInformation instances. */
+@Experimental
+public class TypeDescriptors implements Serializable {
+    @SuppressWarnings("unchecked")
+    public static <T> TypeDescriptor<T> value(TypeDescriptor<T> typeDescriptor)
+            throws ReflectiveOperationException {
+
+        return (TypeDescriptor<T>)
+                TypeUtils.getInstance(
+                        
"org.apache.flink.api.common.typeinfo.descriptor.ValueTypeDescriptorImpl",
+                        typeDescriptor);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <K, V> TypeDescriptor<Map<K, V>> map(
+            TypeDescriptor<K> keyTypeDescriptor, TypeDescriptor<V> 
valueTypeDescriptor)
+            throws ReflectiveOperationException {
+
+        return (TypeDescriptor<Map<K, V>>)
+                TypeUtils.getInstance(
+                        
"org.apache.flink.api.common.typeinfo.descriptor.MapTypeDescriptorImpl",
+                        keyTypeDescriptor,
+                        valueTypeDescriptor);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <T> TypeDescriptor<List<T>> list(TypeDescriptor<T> 
elementTypeDescriptor)
+            throws ReflectiveOperationException {
+
+        return (TypeDescriptor<List<T>>)
+                TypeUtils.getInstance(
+                        
"org.apache.flink.api.common.typeinfo.descriptor.ListTypeDescriptorImpl",
+                        elementTypeDescriptor);
+    }
+
+    // BasicTypeInfo type descriptors
+    public static final TypeDescriptor<String> STRING;
+    public static final TypeDescriptor<Integer> INT;
+    public static final TypeDescriptor<Boolean> BOOLEAN;
+    public static final TypeDescriptor<Long> LONG;
+    public static final TypeDescriptor<Byte> BYTE;
+    public static final TypeDescriptor<Short> SHORT;
+    public static final TypeDescriptor<Double> DOUBLE;
+    public static final TypeDescriptor<Float> FLOAT;
+    public static final TypeDescriptor<Character> CHAR;
+
+    static {
+        try {
+            @SuppressWarnings("unchecked")
+            TypeDescriptor<String> stringTypeTemp =
+                    (TypeDescriptor<String>)
+                            TypeUtils.getInstance(
+                                    
"org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl",
+                                    (TypeDescriptor<String>) () -> 
String.class);
+            STRING = stringTypeTemp;
+
+            @SuppressWarnings("unchecked")
+            TypeDescriptor<Integer> intTypeTemp =
+                    (TypeDescriptor<Integer>)
+                            TypeUtils.getInstance(
+                                    
"org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl",
+                                    (TypeDescriptor<Integer>) () -> 
Integer.class);
+            INT = intTypeTemp;
+
+            @SuppressWarnings("unchecked")
+            TypeDescriptor<Boolean> booleanTypeTemp =
+                    (TypeDescriptor<Boolean>)
+                            TypeUtils.getInstance(
+                                    
"org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl",
+                                    (TypeDescriptor<Boolean>) () -> 
Boolean.class);
+            BOOLEAN = booleanTypeTemp;
+
+            @SuppressWarnings("unchecked")
+            TypeDescriptor<Long> longTypeTemp =
+                    (TypeDescriptor<Long>)
+                            TypeUtils.getInstance(
+                                    
"org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl",
+                                    (TypeDescriptor<Long>) () -> Long.class);
+            LONG = longTypeTemp;
+
+            @SuppressWarnings("unchecked")
+            TypeDescriptor<Byte> byteTypeTemp =
+                    (TypeDescriptor<Byte>)
+                            TypeUtils.getInstance(
+                                    
"org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl",
+                                    (TypeDescriptor<Byte>) () -> Byte.class);
+            BYTE = byteTypeTemp;
+
+            @SuppressWarnings("unchecked")
+            TypeDescriptor<Short> shortTypeTemp =
+                    (TypeDescriptor<Short>)
+                            TypeUtils.getInstance(
+                                    
"org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl",
+                                    (TypeDescriptor<Short>) () -> Short.class);
+            SHORT = shortTypeTemp;
+
+            @SuppressWarnings("unchecked")
+            TypeDescriptor<Double> doubleTypeTemp =
+                    (TypeDescriptor<Double>)
+                            TypeUtils.getInstance(
+                                    
"org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl",
+                                    (TypeDescriptor<Double>) () -> 
Double.class);
+            DOUBLE = doubleTypeTemp;
+
+            @SuppressWarnings("unchecked")
+            TypeDescriptor<Float> floatTypeTemp =
+                    (TypeDescriptor<Float>)
+                            TypeUtils.getInstance(
+                                    
"org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl",
+                                    (TypeDescriptor<Float>) () -> Float.class);
+            FLOAT = floatTypeTemp;
+
+            @SuppressWarnings("unchecked")
+            TypeDescriptor<Character> charTypeTemp =
+                    (TypeDescriptor<Character>)
+                            TypeUtils.getInstance(
+                                    
"org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl",
+                                    (TypeDescriptor<Character>) () -> 
Character.class);
+            CHAR = charTypeTemp;
+
+        } catch (ReflectiveOperationException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/BasicTypeDescriptorImpl.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/BasicTypeDescriptorImpl.java
new file mode 100644
index 00000000000..69265c64ce2
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/BasicTypeDescriptorImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo.descriptor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+/**
+ * Implementation of {@link TypeDescriptor} to create {@link BasicTypeInfo}. 
Note that this class is
+ * initiated via reflection. So, changing its path or constructor will brake 
tests.
+ *
+ * @param <T> type for which {@link TypeInformation} is created.
+ */
+@Internal
+public class BasicTypeDescriptorImpl<T> implements TypeDescriptor<T> {
+
+    private final BasicTypeInfo<T> basicTypeInfo;
+
+    public BasicTypeDescriptorImpl(TypeDescriptor<T> typeDescriptor) {
+        basicTypeInfo = 
BasicTypeInfo.getInfoFor(typeDescriptor.getTypeClass());
+    }
+
+    public BasicTypeInfo<T> getBasicTypeInfo() {
+        return basicTypeInfo;
+    }
+
+    @Override
+    public Class<T> getTypeClass() {
+        return basicTypeInfo.getTypeClass();
+    }
+
+    @Override
+    public String toString() {
+        return "BasicTypeDescriptorImpl [basicTypeInfo=" + basicTypeInfo + "]";
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ListTypeDescriptorImpl.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ListTypeDescriptorImpl.java
new file mode 100644
index 00000000000..ed4c8aa1e17
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ListTypeDescriptorImpl.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo.descriptor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+
+import java.util.List;
+
+/**
+ * Implementation of {@link TypeDescriptor} to create {@link ListTypeInfo}. 
Note that this class is
+ * initiated via reflection. So, changing its path or constructor will brake 
tests.
+ *
+ * @param <T> type for which {@link TypeInformation} is created.
+ */
+@Internal
+public class ListTypeDescriptorImpl<T> implements TypeDescriptor<List<T>> {
+
+    private final ListTypeInfo<T> listTypeInfo;
+
+    public ListTypeDescriptorImpl(Class<T> elementClass) {
+        listTypeInfo = new ListTypeInfo<>(elementClass);
+    }
+
+    public ListTypeDescriptorImpl(TypeDescriptor<T> typeDescriptor) {
+        listTypeInfo = new ListTypeInfo<>(typeDescriptor.getTypeClass());
+    }
+
+    public ListTypeInfo<?> getListTypeInfo() {
+        return listTypeInfo;
+    }
+
+    @Override
+    public Class<List<T>> getTypeClass() {
+        return listTypeInfo.getTypeClass();
+    }
+
+    public Class<T> getComponentType() {
+        return listTypeInfo.getElementTypeInfo().getTypeClass();
+    }
+
+    @Override
+    public String toString() {
+        return "ListTypeDescriptorImpl [listTypeInfo=" + listTypeInfo + "]";
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/MapTypeDescriptorImpl.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/MapTypeDescriptorImpl.java
new file mode 100644
index 00000000000..e9ccf09ffd6
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/MapTypeDescriptorImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo.descriptor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+
+import java.util.Map;
+
+/**
+ * Implementation of {@link TypeDescriptor} to create {@link
+ * org.apache.flink.api.java.typeutils.MapTypeInfo}. Note that this class is 
initiated via
+ * reflection. So, changing its path or constructor will brake tests.
+ *
+ * @param <K> type for which key {@link TypeInformation} is created.
+ * @param <V> type for which value {@link TypeInformation} is created.
+ */
+@Internal
+public class MapTypeDescriptorImpl<K, V> implements TypeDescriptor<Map<K, V>> {
+
+    private final MapTypeInfo<K, V> mapTypeInfo;
+
+    public MapTypeDescriptorImpl(
+            TypeDescriptor<K> keyTypeDescriptor, TypeDescriptor<V> 
valueTypeDescriptor) {
+        mapTypeInfo =
+                new MapTypeInfo<>(
+                        keyTypeDescriptor.getTypeClass(), 
valueTypeDescriptor.getTypeClass());
+    }
+
+    public MapTypeInfo<K, V> getMapTypeInfo() {
+        return mapTypeInfo;
+    }
+
+    @Override
+    public Class<Map<K, V>> getTypeClass() {
+        return mapTypeInfo.getTypeClass();
+    }
+
+    public Class<K> getKeyTypeClass() {
+        return mapTypeInfo.getKeyTypeInfo().getTypeClass();
+    }
+
+    public Class<V> getKeyValueClass() {
+        return mapTypeInfo.getValueTypeInfo().getTypeClass();
+    }
+
+    @Override
+    public String toString() {
+        return "MapTypeDescriptorImpl" + mapTypeInfo;
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ValueTypeDescriptorImpl.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ValueTypeDescriptorImpl.java
new file mode 100644
index 00000000000..01bbe7f2d72
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ValueTypeDescriptorImpl.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo.descriptor;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ValueTypeInfo;
+import org.apache.flink.types.Value;
+
+/**
+ * Implementation of {@link TypeDescriptor} to create {@link ValueTypeInfo}. 
Note that this class is
+ * initiated via reflection. So, changing its path or constructor will brake 
tests.
+ *
+ * @param <T> type for which {@link TypeInformation} is created.
+ */
+@Internal
+public class ValueTypeDescriptorImpl<T extends Value> implements 
TypeDescriptor<T> {
+
+    private final ValueTypeInfo<T> valueTypeInfo;
+
+    public ValueTypeDescriptorImpl(TypeDescriptor<T> typeDescriptor) {
+        this.valueTypeInfo = new 
ValueTypeInfo<>(typeDescriptor.getTypeClass());
+    }
+
+    public ValueTypeInfo<T> getValueTypeInfo() {
+        return valueTypeInfo;
+    }
+
+    @Override
+    public Class<T> getTypeClass() {
+        return valueTypeInfo.getTypeClass();
+    }
+
+    @Override
+    public String toString() {
+        return "ValueTypeDescriptorImpl [valueTypeInfo=" + valueTypeInfo + "]";
+    }
+}
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/BasicTypeDescriptorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/BasicTypeDescriptorTest.java
new file mode 100644
index 00000000000..7bd78f0af8c
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/BasicTypeDescriptorTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.common;
+
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link 
org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl}. */
+class BasicTypeDescriptorTest {
+
+    @ParameterizedTest
+    @MethodSource("basicTypeDescriptors")
+    void testBasicTypeDescriptor(TypeDescriptor<?> typeDescriptor, String 
expectedString) {
+        assertThat(typeDescriptor.toString()).isEqualTo(expectedString);
+    }
+
+    static Object[][] basicTypeDescriptors() {
+        return new Object[][] {
+            {TypeDescriptors.STRING, "BasicTypeDescriptorImpl 
[basicTypeInfo=String]"},
+            {TypeDescriptors.INT, "BasicTypeDescriptorImpl 
[basicTypeInfo=Integer]"},
+            {TypeDescriptors.CHAR, "BasicTypeDescriptorImpl 
[basicTypeInfo=Character]"},
+            {TypeDescriptors.FLOAT, "BasicTypeDescriptorImpl 
[basicTypeInfo=Float]"},
+            {TypeDescriptors.DOUBLE, "BasicTypeDescriptorImpl 
[basicTypeInfo=Double]"},
+            {TypeDescriptors.SHORT, "BasicTypeDescriptorImpl 
[basicTypeInfo=Short]"},
+            {TypeDescriptors.BYTE, "BasicTypeDescriptorImpl 
[basicTypeInfo=Byte]"},
+            {TypeDescriptors.LONG, "BasicTypeDescriptorImpl 
[basicTypeInfo=Long]"},
+            {TypeDescriptors.BOOLEAN, "BasicTypeDescriptorImpl 
[basicTypeInfo=Boolean]"},
+        };
+    }
+}
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ListTypeDescriptorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ListTypeDescriptorTest.java
new file mode 100644
index 00000000000..8514be67517
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ListTypeDescriptorTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.common;
+
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link 
org.apache.flink.api.common.typeinfo.descriptor.ListTypeDescriptorImpl}. */
+class ListTypeDescriptorTest {
+
+    @ParameterizedTest
+    @MethodSource("listTypeDescriptors")
+    void testListTypeDescriptor(TypeDescriptor<?> typeDescriptor, String 
expectedString)
+            throws ReflectiveOperationException {
+        
assertThat(TypeDescriptors.list(typeDescriptor).toString()).isEqualTo(expectedString);
+    }
+
+    static Object[][] listTypeDescriptors() {
+        return new Object[][] {
+            {TypeDescriptors.INT, "ListTypeDescriptorImpl 
[listTypeInfo=List<Integer>]"},
+            {TypeDescriptors.STRING, "ListTypeDescriptorImpl 
[listTypeInfo=List<String>]"},
+            {TypeDescriptors.BOOLEAN, "ListTypeDescriptorImpl 
[listTypeInfo=List<Boolean>]"},
+            {TypeDescriptors.LONG, "ListTypeDescriptorImpl 
[listTypeInfo=List<Long>]"},
+            {TypeDescriptors.BYTE, "ListTypeDescriptorImpl 
[listTypeInfo=List<Byte>]"},
+            {TypeDescriptors.SHORT, "ListTypeDescriptorImpl 
[listTypeInfo=List<Short>]"},
+            {TypeDescriptors.DOUBLE, "ListTypeDescriptorImpl 
[listTypeInfo=List<Double>]"},
+            {TypeDescriptors.FLOAT, "ListTypeDescriptorImpl 
[listTypeInfo=List<Float>]"},
+            {TypeDescriptors.CHAR, "ListTypeDescriptorImpl 
[listTypeInfo=List<Character>]"},
+        };
+    }
+}
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/MapTypeDescriptorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/MapTypeDescriptorTest.java
new file mode 100644
index 00000000000..fd2da4b3f74
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/MapTypeDescriptorTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.common;
+
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link 
org.apache.flink.api.common.typeinfo.descriptor.MapTypeDescriptorImpl}. */
+class MapTypeDescriptorTest {
+
+    @ParameterizedTest
+    @MethodSource("mapTypeDescriptors")
+    void testMapTypeDescriptor(
+            TypeDescriptor<?> keyDescriptor,
+            TypeDescriptor<?> valueDescriptor,
+            String expectedString)
+            throws ReflectiveOperationException {
+        assertThat(TypeDescriptors.map(keyDescriptor, 
valueDescriptor).toString())
+                .isEqualTo(expectedString);
+    }
+
+    static Object[][] mapTypeDescriptors() throws ReflectiveOperationException 
{
+        return new Object[][] {
+            {
+                TypeDescriptors.INT,
+                TypeDescriptors.STRING,
+                "MapTypeDescriptorImplMap<Integer, String>"
+            },
+            {
+                TypeDescriptors.BOOLEAN,
+                TypeDescriptors.LONG,
+                "MapTypeDescriptorImplMap<Boolean, Long>"
+            },
+            {
+                TypeDescriptors.CHAR,
+                TypeDescriptors.DOUBLE,
+                "MapTypeDescriptorImplMap<Character, Double>"
+            },
+            {
+                TypeDescriptors.SHORT,
+                TypeDescriptors.FLOAT,
+                "MapTypeDescriptorImplMap<Short, Float>"
+            },
+            {
+                TypeDescriptors.CHAR,
+                TypeDescriptors.list(TypeDescriptors.STRING),
+                "MapTypeDescriptorImplMap<Character, 
GenericType<java.util.List>>"
+            }
+        };
+    }
+}
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ValueTypeDescriptorTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ValueTypeDescriptorTest.java
new file mode 100644
index 00000000000..9c92110903d
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ValueTypeDescriptorTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.common;
+
+import org.apache.flink.api.common.typeinfo.TypeDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeDescriptors;
+import org.apache.flink.types.BooleanValue;
+import org.apache.flink.types.CharValue;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.FloatValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.ShortValue;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.lang.reflect.InvocationTargetException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link 
org.apache.flink.api.common.typeinfo.descriptor.ValueTypeDescriptorImpl}. */
+class ValueTypeDescriptorTest {
+
+    @ParameterizedTest
+    @MethodSource("valueTypeDescriptors")
+    <T> void testValueTypeDescriptor(TypeDescriptor<T> typeDescriptor, String 
expectedString)
+            throws ReflectiveOperationException {
+        
assertThat(TypeDescriptors.value(typeDescriptor).toString()).isEqualTo(expectedString);
+    }
+
+    static Object[][] valueTypeDescriptors() {
+        return new Object[][] {
+            {
+                (TypeDescriptor<BooleanValue>) () -> BooleanValue.class,
+                "ValueTypeDescriptorImpl 
[valueTypeInfo=ValueType<BooleanValue>]"
+            },
+            {
+                (TypeDescriptor<IntValue>) () -> IntValue.class,
+                "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<IntValue>]"
+            },
+            {
+                (TypeDescriptor<LongValue>) () -> LongValue.class,
+                "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<LongValue>]"
+            },
+            {
+                (TypeDescriptor<ShortValue>) () -> ShortValue.class,
+                "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<ShortValue>]"
+            },
+            {
+                (TypeDescriptor<FloatValue>) () -> FloatValue.class,
+                "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<FloatValue>]"
+            },
+            {
+                (TypeDescriptor<CharValue>) () -> CharValue.class,
+                "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<CharValue>]"
+            },
+            {
+                (TypeDescriptor<DoubleValue>) () -> DoubleValue.class,
+                "ValueTypeDescriptorImpl 
[valueTypeInfo=ValueType<DoubleValue>]"
+            },
+        };
+    }
+
+    @Test
+    void testValueTypeDescriptorNonValue() {
+        assertThatThrownBy(
+                        () ->
+                                TypeDescriptors.value((TypeDescriptor<Double>) 
() -> Double.class)
+                                        .toString())
+                .isInstanceOf(InvocationTargetException.class);
+    }
+}

Reply via email to