This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f91730dccb65438cba4da88a6607bd0347b1561c
Author: Jeyhun Karimov <je.kari...@gmail.com>
AuthorDate: Thu May 30 13:58:13 2024 +0200

    [FLINK-34977][API] Introduce TypeDescriptor and TypeUtils
---
 .../flink/api/common/typeinfo/TypeDescriptor.java  |  39 +++++++
 .../flink/api/common/typeinfo/utils/TypeUtils.java | 125 +++++++++++++++++++++
 .../datastream/impl/common/TypeUtilsTest.java      |  70 ++++++++++++
 3 files changed, 234 insertions(+)

diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptor.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptor.java
new file mode 100644
index 00000000000..9a616ea61b3
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+
+/**
+ * Descriptor interface to create TypeInformation instances.
+ *
+ * @param <T> The type represented by this type descriptor.
+ */
+@Experimental
+public interface TypeDescriptor<T> extends Serializable {
+
+    /**
+     * Gets the class of the type represented by this type descriptor.
+     *
+     * @return The class of the type represented by this type descriptor.
+     */
+    Class<T> getTypeClass();
+}
diff --git 
a/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/utils/TypeUtils.java
 
b/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/utils/TypeUtils.java
new file mode 100644
index 00000000000..04eea60a475
--- /dev/null
+++ 
b/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/utils/TypeUtils.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.typeinfo.utils;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.io.Serializable;
+import java.lang.reflect.Constructor;
+
+/** Utility class to create objects via reflection. */
+@Experimental
+public class TypeUtils implements Serializable {
+
+    public static Object getInstance(String classPath, Object... args)
+            throws ReflectiveOperationException {
+        Class<?> descriptorClass = Class.forName(classPath);
+        Constructor<?>[] constructors = descriptorClass.getConstructors();
+
+        for (Constructor<?> constructor : constructors) {
+            Class<?>[] parameterTypes = constructor.getParameterTypes();
+
+            if (areParameterTypesCompatible(parameterTypes, args)) {
+                return constructor.newInstance(convertArgs(parameterTypes, 
args));
+            }
+        }
+
+        throw new NoSuchMethodException("No suitable constructor found for the 
given arguments.");
+    }
+
+    private static boolean areParameterTypesCompatible(Class<?>[] 
parameterTypes, Object[] args) {
+        if (parameterTypes.length != args.length) {
+            return false;
+        }
+
+        for (int i = 0; i < parameterTypes.length; i++) {
+            if (args[i] == null) {
+                if (parameterTypes[i].isPrimitive()) {
+                    return false; // primitive types cannot be null
+                }
+            } else if (!isCompatible(parameterTypes[i], args[i])) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static boolean isCompatible(Class<?> parameterType, Object arg) {
+        if (parameterType.isPrimitive()) {
+            Class<?> wrapperType = getWrapperClass(parameterType);
+            return wrapperType.isInstance(arg);
+        } else {
+            return parameterType.isInstance(arg);
+        }
+    }
+
+    private static Class<?> getWrapperClass(Class<?> primitiveType) {
+        if (primitiveType == boolean.class) {
+            return Boolean.class;
+        } else if (primitiveType == byte.class) {
+            return Byte.class;
+        } else if (primitiveType == char.class) {
+            return Character.class;
+        } else if (primitiveType == short.class) {
+            return Short.class;
+        } else if (primitiveType == int.class) {
+            return Integer.class;
+        } else if (primitiveType == long.class) {
+            return Long.class;
+        } else if (primitiveType == float.class) {
+            return Float.class;
+        } else if (primitiveType == double.class) {
+            return Double.class;
+        }
+        throw new IllegalArgumentException("Unknown primitive type: " + 
primitiveType.getName());
+    }
+
+    private static Object[] convertArgs(Class<?>[] parameterTypes, Object[] 
args) {
+        Object[] convertedArgs = new Object[args.length];
+        for (int i = 0; i < args.length; i++) {
+            if (parameterTypes[i].isPrimitive()) {
+                convertedArgs[i] = 
convertToPrimitiveWrapper(parameterTypes[i], args[i]);
+            } else {
+                convertedArgs[i] = args[i];
+            }
+        }
+        return convertedArgs;
+    }
+
+    private static Object convertToPrimitiveWrapper(Class<?> parameterType, 
Object arg) {
+        if (parameterType == boolean.class) {
+            return (Boolean) arg;
+        } else if (parameterType == byte.class) {
+            return (Byte) arg;
+        } else if (parameterType == char.class) {
+            return (Character) arg;
+        } else if (parameterType == short.class) {
+            return (Short) arg;
+        } else if (parameterType == int.class) {
+            return (Integer) arg;
+        } else if (parameterType == long.class) {
+            return (Long) arg;
+        } else if (parameterType == float.class) {
+            return (Float) arg;
+        } else if (parameterType == double.class) {
+            return (Double) arg;
+        }
+        throw new IllegalArgumentException("Unknown primitive type: " + 
parameterType.getName());
+    }
+}
diff --git 
a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/TypeUtilsTest.java
 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/TypeUtilsTest.java
new file mode 100644
index 00000000000..9a5b2150eb1
--- /dev/null
+++ 
b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/TypeUtilsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.datastream.impl.common;
+
+import org.apache.flink.api.common.typeinfo.utils.TypeUtils;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link org.apache.flink.api.common.typeinfo.utils.TypeUtils}. */
+class TypeUtilsTest {
+
+    @Test
+    void testNewInstanceWithPrimitiveArg() throws ReflectiveOperationException 
{
+        assertThat(
+                        TypeUtils.getInstance(
+                                        
"org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus",
+                                        0)
+                                .toString())
+                .isEqualTo("WatermarkStatus(ACTIVE)");
+    }
+
+    @Test
+    void testNewInstanceWithObjectArg() throws ReflectiveOperationException {
+        assertThat(
+                        TypeUtils.getInstance(
+                                        
"org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer",
+                                        new BooleanSerializer())
+                                .toString())
+                .contains("StreamElementSerializer");
+    }
+
+    @Test
+    void testNewInstanceWithIncorrectConstructorArgType() {
+        assertThatThrownBy(
+                        () ->
+                                TypeUtils.getInstance(
+                                        
"org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer",
+                                        123))
+                .isInstanceOf(NoSuchMethodException.class);
+    }
+
+    @Test
+    void testNewInstanceWithIncorrectConstructorArgCount() {
+        assertThatThrownBy(
+                        () ->
+                                TypeUtils.getInstance(
+                                        
"org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer"))
+                .isInstanceOf(NoSuchMethodException.class);
+    }
+}

Reply via email to