This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c6c7c99bdec5be9c233515bdfad0473969004ed3 Author: Jeyhun Karimov <je.kari...@gmail.com> AuthorDate: Thu May 30 14:06:55 2024 +0200 [FLINK-34977][API] Introduce Type Descriptors --- .../flink/api/common/typeinfo/TypeDescriptors.java | 152 +++++++++++++++++++++ .../descriptor/BasicTypeDescriptorImpl.java | 54 ++++++++ .../descriptor/ListTypeDescriptorImpl.java | 64 +++++++++ .../typeinfo/descriptor/MapTypeDescriptorImpl.java | 69 ++++++++++ .../descriptor/ValueTypeDescriptorImpl.java | 55 ++++++++ .../impl/common/BasicTypeDescriptorTest.java | 51 +++++++ .../impl/common/ListTypeDescriptorTest.java | 52 +++++++ .../impl/common/MapTypeDescriptorTest.java | 72 ++++++++++ .../impl/common/ValueTypeDescriptorTest.java | 91 ++++++++++++ 9 files changed, 660 insertions(+) diff --git a/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java b/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java new file mode 100644 index 00000000000..dc1addb85dc --- /dev/null +++ b/flink-core-api/src/main/java/org/apache/flink/api/common/typeinfo/TypeDescriptors.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.typeinfo.utils.TypeUtils; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +/** Descriptor interface to create TypeInformation instances. */ +@Experimental +public class TypeDescriptors implements Serializable { + @SuppressWarnings("unchecked") + public static <T> TypeDescriptor<T> value(TypeDescriptor<T> typeDescriptor) + throws ReflectiveOperationException { + + return (TypeDescriptor<T>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.ValueTypeDescriptorImpl", + typeDescriptor); + } + + @SuppressWarnings("unchecked") + public static <K, V> TypeDescriptor<Map<K, V>> map( + TypeDescriptor<K> keyTypeDescriptor, TypeDescriptor<V> valueTypeDescriptor) + throws ReflectiveOperationException { + + return (TypeDescriptor<Map<K, V>>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.MapTypeDescriptorImpl", + keyTypeDescriptor, + valueTypeDescriptor); + } + + @SuppressWarnings("unchecked") + public static <T> TypeDescriptor<List<T>> list(TypeDescriptor<T> elementTypeDescriptor) + throws ReflectiveOperationException { + + return (TypeDescriptor<List<T>>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.ListTypeDescriptorImpl", + elementTypeDescriptor); + } + + // BasicTypeInfo type descriptors + public static final TypeDescriptor<String> STRING; + public static final TypeDescriptor<Integer> INT; + public static final TypeDescriptor<Boolean> BOOLEAN; + public static final TypeDescriptor<Long> LONG; + public static final TypeDescriptor<Byte> BYTE; + public static final TypeDescriptor<Short> SHORT; + public static final TypeDescriptor<Double> DOUBLE; + public static final TypeDescriptor<Float> FLOAT; + public static final TypeDescriptor<Character> CHAR; + + static { + try { + @SuppressWarnings("unchecked") + TypeDescriptor<String> stringTypeTemp = + (TypeDescriptor<String>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl", + (TypeDescriptor<String>) () -> String.class); + STRING = stringTypeTemp; + + @SuppressWarnings("unchecked") + TypeDescriptor<Integer> intTypeTemp = + (TypeDescriptor<Integer>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl", + (TypeDescriptor<Integer>) () -> Integer.class); + INT = intTypeTemp; + + @SuppressWarnings("unchecked") + TypeDescriptor<Boolean> booleanTypeTemp = + (TypeDescriptor<Boolean>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl", + (TypeDescriptor<Boolean>) () -> Boolean.class); + BOOLEAN = booleanTypeTemp; + + @SuppressWarnings("unchecked") + TypeDescriptor<Long> longTypeTemp = + (TypeDescriptor<Long>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl", + (TypeDescriptor<Long>) () -> Long.class); + LONG = longTypeTemp; + + @SuppressWarnings("unchecked") + TypeDescriptor<Byte> byteTypeTemp = + (TypeDescriptor<Byte>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl", + (TypeDescriptor<Byte>) () -> Byte.class); + BYTE = byteTypeTemp; + + @SuppressWarnings("unchecked") + TypeDescriptor<Short> shortTypeTemp = + (TypeDescriptor<Short>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl", + (TypeDescriptor<Short>) () -> Short.class); + SHORT = shortTypeTemp; + + @SuppressWarnings("unchecked") + TypeDescriptor<Double> doubleTypeTemp = + (TypeDescriptor<Double>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl", + (TypeDescriptor<Double>) () -> Double.class); + DOUBLE = doubleTypeTemp; + + @SuppressWarnings("unchecked") + TypeDescriptor<Float> floatTypeTemp = + (TypeDescriptor<Float>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl", + (TypeDescriptor<Float>) () -> Float.class); + FLOAT = floatTypeTemp; + + @SuppressWarnings("unchecked") + TypeDescriptor<Character> charTypeTemp = + (TypeDescriptor<Character>) + TypeUtils.getInstance( + "org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl", + (TypeDescriptor<Character>) () -> Character.class); + CHAR = charTypeTemp; + + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/BasicTypeDescriptorImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/BasicTypeDescriptorImpl.java new file mode 100644 index 00000000000..69265c64ce2 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/BasicTypeDescriptorImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo.descriptor; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +/** + * Implementation of {@link TypeDescriptor} to create {@link BasicTypeInfo}. Note that this class is + * initiated via reflection. So, changing its path or constructor will brake tests. + * + * @param <T> type for which {@link TypeInformation} is created. + */ +@Internal +public class BasicTypeDescriptorImpl<T> implements TypeDescriptor<T> { + + private final BasicTypeInfo<T> basicTypeInfo; + + public BasicTypeDescriptorImpl(TypeDescriptor<T> typeDescriptor) { + basicTypeInfo = BasicTypeInfo.getInfoFor(typeDescriptor.getTypeClass()); + } + + public BasicTypeInfo<T> getBasicTypeInfo() { + return basicTypeInfo; + } + + @Override + public Class<T> getTypeClass() { + return basicTypeInfo.getTypeClass(); + } + + @Override + public String toString() { + return "BasicTypeDescriptorImpl [basicTypeInfo=" + basicTypeInfo + "]"; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ListTypeDescriptorImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ListTypeDescriptorImpl.java new file mode 100644 index 00000000000..ed4c8aa1e17 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ListTypeDescriptorImpl.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo.descriptor; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ListTypeInfo; + +import java.util.List; + +/** + * Implementation of {@link TypeDescriptor} to create {@link ListTypeInfo}. Note that this class is + * initiated via reflection. So, changing its path or constructor will brake tests. + * + * @param <T> type for which {@link TypeInformation} is created. + */ +@Internal +public class ListTypeDescriptorImpl<T> implements TypeDescriptor<List<T>> { + + private final ListTypeInfo<T> listTypeInfo; + + public ListTypeDescriptorImpl(Class<T> elementClass) { + listTypeInfo = new ListTypeInfo<>(elementClass); + } + + public ListTypeDescriptorImpl(TypeDescriptor<T> typeDescriptor) { + listTypeInfo = new ListTypeInfo<>(typeDescriptor.getTypeClass()); + } + + public ListTypeInfo<?> getListTypeInfo() { + return listTypeInfo; + } + + @Override + public Class<List<T>> getTypeClass() { + return listTypeInfo.getTypeClass(); + } + + public Class<T> getComponentType() { + return listTypeInfo.getElementTypeInfo().getTypeClass(); + } + + @Override + public String toString() { + return "ListTypeDescriptorImpl [listTypeInfo=" + listTypeInfo + "]"; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/MapTypeDescriptorImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/MapTypeDescriptorImpl.java new file mode 100644 index 00000000000..e9ccf09ffd6 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/MapTypeDescriptorImpl.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo.descriptor; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; + +import java.util.Map; + +/** + * Implementation of {@link TypeDescriptor} to create {@link + * org.apache.flink.api.java.typeutils.MapTypeInfo}. Note that this class is initiated via + * reflection. So, changing its path or constructor will brake tests. + * + * @param <K> type for which key {@link TypeInformation} is created. + * @param <V> type for which value {@link TypeInformation} is created. + */ +@Internal +public class MapTypeDescriptorImpl<K, V> implements TypeDescriptor<Map<K, V>> { + + private final MapTypeInfo<K, V> mapTypeInfo; + + public MapTypeDescriptorImpl( + TypeDescriptor<K> keyTypeDescriptor, TypeDescriptor<V> valueTypeDescriptor) { + mapTypeInfo = + new MapTypeInfo<>( + keyTypeDescriptor.getTypeClass(), valueTypeDescriptor.getTypeClass()); + } + + public MapTypeInfo<K, V> getMapTypeInfo() { + return mapTypeInfo; + } + + @Override + public Class<Map<K, V>> getTypeClass() { + return mapTypeInfo.getTypeClass(); + } + + public Class<K> getKeyTypeClass() { + return mapTypeInfo.getKeyTypeInfo().getTypeClass(); + } + + public Class<V> getKeyValueClass() { + return mapTypeInfo.getValueTypeInfo().getTypeClass(); + } + + @Override + public String toString() { + return "MapTypeDescriptorImpl" + mapTypeInfo; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ValueTypeDescriptorImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ValueTypeDescriptorImpl.java new file mode 100644 index 00000000000..01bbe7f2d72 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/descriptor/ValueTypeDescriptorImpl.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeinfo.descriptor; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeinfo.TypeDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ValueTypeInfo; +import org.apache.flink.types.Value; + +/** + * Implementation of {@link TypeDescriptor} to create {@link ValueTypeInfo}. Note that this class is + * initiated via reflection. So, changing its path or constructor will brake tests. + * + * @param <T> type for which {@link TypeInformation} is created. + */ +@Internal +public class ValueTypeDescriptorImpl<T extends Value> implements TypeDescriptor<T> { + + private final ValueTypeInfo<T> valueTypeInfo; + + public ValueTypeDescriptorImpl(TypeDescriptor<T> typeDescriptor) { + this.valueTypeInfo = new ValueTypeInfo<>(typeDescriptor.getTypeClass()); + } + + public ValueTypeInfo<T> getValueTypeInfo() { + return valueTypeInfo; + } + + @Override + public Class<T> getTypeClass() { + return valueTypeInfo.getTypeClass(); + } + + @Override + public String toString() { + return "ValueTypeDescriptorImpl [valueTypeInfo=" + valueTypeInfo + "]"; + } +} diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/BasicTypeDescriptorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/BasicTypeDescriptorTest.java new file mode 100644 index 00000000000..7bd78f0af8c --- /dev/null +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/BasicTypeDescriptorTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.common; + +import org.apache.flink.api.common.typeinfo.TypeDescriptor; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link org.apache.flink.api.common.typeinfo.descriptor.BasicTypeDescriptorImpl}. */ +class BasicTypeDescriptorTest { + + @ParameterizedTest + @MethodSource("basicTypeDescriptors") + void testBasicTypeDescriptor(TypeDescriptor<?> typeDescriptor, String expectedString) { + assertThat(typeDescriptor.toString()).isEqualTo(expectedString); + } + + static Object[][] basicTypeDescriptors() { + return new Object[][] { + {TypeDescriptors.STRING, "BasicTypeDescriptorImpl [basicTypeInfo=String]"}, + {TypeDescriptors.INT, "BasicTypeDescriptorImpl [basicTypeInfo=Integer]"}, + {TypeDescriptors.CHAR, "BasicTypeDescriptorImpl [basicTypeInfo=Character]"}, + {TypeDescriptors.FLOAT, "BasicTypeDescriptorImpl [basicTypeInfo=Float]"}, + {TypeDescriptors.DOUBLE, "BasicTypeDescriptorImpl [basicTypeInfo=Double]"}, + {TypeDescriptors.SHORT, "BasicTypeDescriptorImpl [basicTypeInfo=Short]"}, + {TypeDescriptors.BYTE, "BasicTypeDescriptorImpl [basicTypeInfo=Byte]"}, + {TypeDescriptors.LONG, "BasicTypeDescriptorImpl [basicTypeInfo=Long]"}, + {TypeDescriptors.BOOLEAN, "BasicTypeDescriptorImpl [basicTypeInfo=Boolean]"}, + }; + } +} diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ListTypeDescriptorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ListTypeDescriptorTest.java new file mode 100644 index 00000000000..8514be67517 --- /dev/null +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ListTypeDescriptorTest.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.common; + +import org.apache.flink.api.common.typeinfo.TypeDescriptor; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link org.apache.flink.api.common.typeinfo.descriptor.ListTypeDescriptorImpl}. */ +class ListTypeDescriptorTest { + + @ParameterizedTest + @MethodSource("listTypeDescriptors") + void testListTypeDescriptor(TypeDescriptor<?> typeDescriptor, String expectedString) + throws ReflectiveOperationException { + assertThat(TypeDescriptors.list(typeDescriptor).toString()).isEqualTo(expectedString); + } + + static Object[][] listTypeDescriptors() { + return new Object[][] { + {TypeDescriptors.INT, "ListTypeDescriptorImpl [listTypeInfo=List<Integer>]"}, + {TypeDescriptors.STRING, "ListTypeDescriptorImpl [listTypeInfo=List<String>]"}, + {TypeDescriptors.BOOLEAN, "ListTypeDescriptorImpl [listTypeInfo=List<Boolean>]"}, + {TypeDescriptors.LONG, "ListTypeDescriptorImpl [listTypeInfo=List<Long>]"}, + {TypeDescriptors.BYTE, "ListTypeDescriptorImpl [listTypeInfo=List<Byte>]"}, + {TypeDescriptors.SHORT, "ListTypeDescriptorImpl [listTypeInfo=List<Short>]"}, + {TypeDescriptors.DOUBLE, "ListTypeDescriptorImpl [listTypeInfo=List<Double>]"}, + {TypeDescriptors.FLOAT, "ListTypeDescriptorImpl [listTypeInfo=List<Float>]"}, + {TypeDescriptors.CHAR, "ListTypeDescriptorImpl [listTypeInfo=List<Character>]"}, + }; + } +} diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/MapTypeDescriptorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/MapTypeDescriptorTest.java new file mode 100644 index 00000000000..fd2da4b3f74 --- /dev/null +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/MapTypeDescriptorTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.common; + +import org.apache.flink.api.common.typeinfo.TypeDescriptor; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link org.apache.flink.api.common.typeinfo.descriptor.MapTypeDescriptorImpl}. */ +class MapTypeDescriptorTest { + + @ParameterizedTest + @MethodSource("mapTypeDescriptors") + void testMapTypeDescriptor( + TypeDescriptor<?> keyDescriptor, + TypeDescriptor<?> valueDescriptor, + String expectedString) + throws ReflectiveOperationException { + assertThat(TypeDescriptors.map(keyDescriptor, valueDescriptor).toString()) + .isEqualTo(expectedString); + } + + static Object[][] mapTypeDescriptors() throws ReflectiveOperationException { + return new Object[][] { + { + TypeDescriptors.INT, + TypeDescriptors.STRING, + "MapTypeDescriptorImplMap<Integer, String>" + }, + { + TypeDescriptors.BOOLEAN, + TypeDescriptors.LONG, + "MapTypeDescriptorImplMap<Boolean, Long>" + }, + { + TypeDescriptors.CHAR, + TypeDescriptors.DOUBLE, + "MapTypeDescriptorImplMap<Character, Double>" + }, + { + TypeDescriptors.SHORT, + TypeDescriptors.FLOAT, + "MapTypeDescriptorImplMap<Short, Float>" + }, + { + TypeDescriptors.CHAR, + TypeDescriptors.list(TypeDescriptors.STRING), + "MapTypeDescriptorImplMap<Character, GenericType<java.util.List>>" + } + }; + } +} diff --git a/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ValueTypeDescriptorTest.java b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ValueTypeDescriptorTest.java new file mode 100644 index 00000000000..9c92110903d --- /dev/null +++ b/flink-datastream/src/test/java/org/apache/flink/datastream/impl/common/ValueTypeDescriptorTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.impl.common; + +import org.apache.flink.api.common.typeinfo.TypeDescriptor; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; +import org.apache.flink.types.BooleanValue; +import org.apache.flink.types.CharValue; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.FloatValue; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.ShortValue; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.lang.reflect.InvocationTargetException; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link org.apache.flink.api.common.typeinfo.descriptor.ValueTypeDescriptorImpl}. */ +class ValueTypeDescriptorTest { + + @ParameterizedTest + @MethodSource("valueTypeDescriptors") + <T> void testValueTypeDescriptor(TypeDescriptor<T> typeDescriptor, String expectedString) + throws ReflectiveOperationException { + assertThat(TypeDescriptors.value(typeDescriptor).toString()).isEqualTo(expectedString); + } + + static Object[][] valueTypeDescriptors() { + return new Object[][] { + { + (TypeDescriptor<BooleanValue>) () -> BooleanValue.class, + "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<BooleanValue>]" + }, + { + (TypeDescriptor<IntValue>) () -> IntValue.class, + "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<IntValue>]" + }, + { + (TypeDescriptor<LongValue>) () -> LongValue.class, + "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<LongValue>]" + }, + { + (TypeDescriptor<ShortValue>) () -> ShortValue.class, + "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<ShortValue>]" + }, + { + (TypeDescriptor<FloatValue>) () -> FloatValue.class, + "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<FloatValue>]" + }, + { + (TypeDescriptor<CharValue>) () -> CharValue.class, + "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<CharValue>]" + }, + { + (TypeDescriptor<DoubleValue>) () -> DoubleValue.class, + "ValueTypeDescriptorImpl [valueTypeInfo=ValueType<DoubleValue>]" + }, + }; + } + + @Test + void testValueTypeDescriptorNonValue() { + assertThatThrownBy( + () -> + TypeDescriptors.value((TypeDescriptor<Double>) () -> Double.class) + .toString()) + .isInstanceOf(InvocationTargetException.class); + } +}