This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9569cc84b408e71e59732c16edb8764184c78cee Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Fri Dec 13 09:38:59 2019 +0100 [hotfix][table-common] Add utility method for checking if a type is a composite type --- .../types/logical/utils/LogicalTypeChecks.java | 16 +++ .../flink/table/types/utils/DataTypeUtils.java | 5 +- .../types/logical/utils/LogicalTypeChecksTest.java | 109 +++++++++++++++++++++ 3 files changed, 129 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java index 45e2318..c1d35a2 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java @@ -24,6 +24,7 @@ import org.apache.flink.table.types.logical.BinaryType; import org.apache.flink.table.types.logical.CharType; import org.apache.flink.table.types.logical.DayTimeIntervalType; import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.DistinctType; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; @@ -85,6 +86,21 @@ public final class LogicalTypeChecks { return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) == TimestampKind.PROCTIME; } + /** + * Checks if the given type is a composite type. + * + * @param logicalType Logical data type to check + * @return True if the type is composite type. + */ + public static boolean isCompositeType(LogicalType logicalType) { + if (logicalType instanceof DistinctType) { + return isCompositeType(((DistinctType) logicalType).getSourceType()); + } + + LogicalTypeRoot typeRoot = logicalType.getTypeRoot(); + return typeRoot == LogicalTypeRoot.STRUCTURED_TYPE || typeRoot == LogicalTypeRoot.ROW; + } + public static int getLength(LogicalType logicalType) { return logicalType.accept(LENGTH_EXTRACTOR); } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java index c5f798b..4975e6d 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/DataTypeUtils.java @@ -27,10 +27,14 @@ import org.apache.flink.table.types.FieldsDataType; import org.apache.flink.table.types.KeyValueDataType; import org.apache.flink.table.types.inference.TypeTransformation; import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.LegacyTypeInformationType; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.MultisetType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.util.Preconditions; import java.util.HashMap; @@ -73,7 +77,6 @@ public final class DataTypeUtils { // no instantiation } - // ------------------------------------------------------------------------------------------ private static class DataTypeTransformer implements DataTypeVisitor<DataType> { diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecksTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecksTest.java new file mode 100644 index 0000000..218a438 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecksTest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.logical.utils; + +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.FieldsDataType; +import org.apache.flink.table.types.logical.DistinctType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.utils.TypeConversions; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.table.api.DataTypes.FIELD; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +/** + * Tests for {@link LogicalTypeChecks}. + */ +public class LogicalTypeChecksTest { + + @Test + public void testIsCompositeTypeRowType() { + DataType dataType = ROW(FIELD("f0", INT()), FIELD("f1", STRING())); + boolean isCompositeType = LogicalTypeChecks.isCompositeType(dataType.getLogicalType()); + + assertThat(isCompositeType, is(true)); + } + + @Test + public void testIsCompositeTypeDistinctType() { + DataType dataType = ROW(FIELD("f0", INT()), FIELD("f1", STRING())); + DistinctType distinctType = DistinctType.newBuilder( + ObjectIdentifier.of("catalog", "database", "type"), + dataType.getLogicalType()).build(); + boolean isCompositeType = LogicalTypeChecks.isCompositeType(distinctType); + + assertThat(isCompositeType, is(true)); + } + + @Test + public void testIsCompositeTypeLegacyCompositeType() { + DataType dataType = TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(Types.STRING, Types.INT)); + boolean isCompositeType = LogicalTypeChecks.isCompositeType(dataType.getLogicalType()); + + assertThat(isCompositeType, is(true)); + } + + @Test + public void testIsCompositeTypeStructuredType() { + StructuredType logicalType = StructuredType.newBuilder(ObjectIdentifier.of("catalog", "database", "type")) + .attributes(Arrays.asList( + new StructuredType.StructuredAttribute("f0", DataTypes.INT().getLogicalType()), + new StructuredType.StructuredAttribute("f1", DataTypes.STRING().getLogicalType()) + )) + .build(); + + Map<String, DataType> dataTypes = new HashMap<>(); + dataTypes.put("f0", DataTypes.INT()); + dataTypes.put("f1", DataTypes.STRING()); + FieldsDataType dataType = new FieldsDataType(logicalType, dataTypes); + boolean isCompositeType = LogicalTypeChecks.isCompositeType(dataType.getLogicalType()); + + assertThat(isCompositeType, is(true)); + } + + @Test + public void testIsCompositeTypeLegacySimpleType() { + DataType dataType = TypeConversions.fromLegacyInfoToDataType(Types.STRING); + boolean isCompositeType = LogicalTypeChecks.isCompositeType(dataType.getLogicalType()); + + assertThat(isCompositeType, is(false)); + } + + @Test + public void testIsCompositeTypeSimpleType() { + DataType dataType = DataTypes.TIMESTAMP(); + boolean isCompositeType = LogicalTypeChecks.isCompositeType(dataType.getLogicalType()); + + assertThat(isCompositeType, is(false)); + } +}