This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6cb9827f827263f740a59188936dfea44a298987 Author: Sergey Nuyanzin <[email protected]> AuthorDate: Tue Nov 4 08:39:08 2025 +0100 [FLINK-20539][table] Add `table.legacy-nested-row-nullability` table option, port row losing nullability and losing struct kind fixes from Calcite This closes #27158. --- .../generated/table_config_configuration.html | 6 + .../flink/sql/parser/FlinkSqlParsingValidator.java | 46 + .../parser/type/ExtendedSqlRowTypeNameSpec.java | 14 +- .../flink/sql/parser/FlinkDDLDataTypeTest.java | 49 +- .../flink/table/api/config/TableConfigOptions.java | 13 + .../calcite/rel/type/RelDataTypeFactoryImpl.java | 732 ++++++++ .../org/apache/calcite/sql/type/SqlTypeUtil.java | 1807 ++++++++++++++++++++ .../planner/calcite/FlinkCalciteSqlValidator.java | 12 +- .../table/planner/calcite/FlinkRexBuilder.java | 36 +- .../planner/functions/CastFunctionMiscITCase.java | 3 - .../operations/SqlDdlToOperationConverterTest.java | 3 - .../table/planner/plan/stream/sql/CalcTest.xml | 4 +- .../planner/runtime/stream/sql/CalcITCase.scala | 55 + 13 files changed, 2741 insertions(+), 39 deletions(-) diff --git a/docs/layouts/shortcodes/generated/table_config_configuration.html b/docs/layouts/shortcodes/generated/table_config_configuration.html index 6242c70e415..9c9229a96eb 100644 --- a/docs/layouts/shortcodes/generated/table_config_configuration.html +++ b/docs/layouts/shortcodes/generated/table_config_configuration.html @@ -56,6 +56,12 @@ <td>Integer</td> <td>Specifies a threshold where generated code will be split into sub-function calls. Java has a maximum method length of 64 KB. This setting allows for finer granularity if necessary. Default value is 4000 instead of 64KB as by default JIT refuses to work on methods with more than 8K byte code.</td> </tr> + <tr> + <td><h5>table.legacy-nested-row-nullability</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> + <td style="word-wrap: break-word;">false</td> + <td>Boolean</td> + <td>Before Flink 2.2, row types defined in SQL e.g. `SELECT CAST(f AS ROW<i NOT NULL>)` did ignore the `NOT NULL` constraint. This was more aligned with the SQL standard but caused many type inconsistencies and cryptic error message when working on nested data. For example, it prevented using rows in computed columns or join keys. The new behavior takes the nullability into consideration.</td> + </tr> <tr> <td><h5>table.local-time-zone</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td> <td style="word-wrap: break-word;">"default"</td> diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlParsingValidator.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlParsingValidator.java new file mode 100644 index 00000000000..10539154be2 --- /dev/null +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/FlinkSqlParsingValidator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.sql.parser; + +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlOperatorTable; +import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; +import org.apache.calcite.sql.validate.SqlValidatorImpl; + +/** + * Extends Calcite's {@link org.apache.calcite.sql.validate.SqlValidator}. It allows for + * parameterizing the parsing based on feature flags backed by options. + */ +public class FlinkSqlParsingValidator extends SqlValidatorImpl { + private final boolean isLegacyNestedRowNullability; + + protected FlinkSqlParsingValidator( + SqlOperatorTable opTab, + SqlValidatorCatalogReader catalogReader, + RelDataTypeFactory typeFactory, + Config config, + boolean isLegacyNestedRowNullability) { + super(opTab, catalogReader, typeFactory, config); + this.isLegacyNestedRowNullability = isLegacyNestedRowNullability; + } + + public final boolean isLegacyNestedRowNullability() { + return isLegacyNestedRowNullability; + } +} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java index 9605a6e5aef..9c6ac8868a3 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java @@ -18,6 +18,8 @@ package org.apache.flink.sql.parser.type; +import org.apache.flink.sql.parser.FlinkSqlParsingValidator; + import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.StructKind; @@ -156,10 +158,18 @@ public class ExtendedSqlRowTypeNameSpec extends SqlTypeNameSpec { @Override public RelDataType deriveType(SqlValidator sqlValidator) { final RelDataTypeFactory typeFactory = sqlValidator.getTypeFactory(); + final StructKind structKind = + ((FlinkSqlParsingValidator) sqlValidator).isLegacyNestedRowNullability() + ? StructKind.FULLY_QUALIFIED + : StructKind.PEEK_FIELDS_NO_EXPAND; return typeFactory.createStructType( - StructKind.PEEK_FIELDS_NO_EXPAND, + structKind, fieldTypes.stream() - .map(dt -> dt.deriveType(sqlValidator)) + .map( + dt -> + dt.deriveType( + sqlValidator, + dt.getNullable() == null || dt.getNullable())) .collect(Collectors.toList()), fieldNames.stream().map(SqlIdentifier::toString).collect(Collectors.toList())); } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java index 7ba04b59315..a1486b3d708 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java @@ -48,7 +48,6 @@ import org.apache.calcite.sql.validate.SqlConformance; import org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; -import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.test.MockSqlOperatorTable; import org.apache.calcite.test.catalog.MockCatalogReaderSimple; import org.apache.calcite.util.SourceStringReader; @@ -60,8 +59,6 @@ import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nullable; import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Stream; @@ -231,32 +228,28 @@ class FlinkDDLDataTypeTest { nullable( FIXTURE.createStructType( StructKind.PEEK_FIELDS_NO_EXPAND, - Collections.singletonList(nullable(FIXTURE.intType)), - Collections.singletonList("f0"))), + List.of(nullable(FIXTURE.intType)), + List.of("f0"))), "ROW< `f0` INTEGER >"), createArgumentsTestItem( "ROW(`f0` INT)", nullable( FIXTURE.createStructType( StructKind.PEEK_FIELDS_NO_EXPAND, - Collections.singletonList(nullable(FIXTURE.intType)), - Collections.singletonList("f0"))), + List.of(nullable(FIXTURE.intType)), + List.of("f0"))), "ROW(`f0` INTEGER)"), createArgumentsTestItem( "ROW<>", nullable( FIXTURE.createStructType( - StructKind.PEEK_FIELDS_NO_EXPAND, - Collections.emptyList(), - Collections.emptyList())), + StructKind.PEEK_FIELDS_NO_EXPAND, List.of(), List.of())), "ROW<>"), createArgumentsTestItem( "ROW()", nullable( FIXTURE.createStructType( - StructKind.PEEK_FIELDS_NO_EXPAND, - Collections.emptyList(), - Collections.emptyList())), + StructKind.PEEK_FIELDS_NO_EXPAND, List.of(), List.of())), "ROW()"), createArgumentsTestItem( "ROW<f0 INT NOT NULL 'This is a comment.', " @@ -588,7 +581,16 @@ class FlinkDDLDataTypeTest { private final SqlParser.Config parserConfig; TestFactory() { - this(DEFAULT_OPTIONS, MockCatalogReaderSimple::create, SqlValidatorUtil::newValidator); + this( + DEFAULT_OPTIONS, + MockCatalogReaderSimple::create, + (sqlOperatorTable, sqlValidatorCatalogReader, relDataTypeFactory, config) -> + new FlinkSqlParsingValidator( + sqlOperatorTable, + sqlValidatorCatalogReader, + relDataTypeFactory, + config, + false)); } TestFactory( @@ -672,16 +674,15 @@ class FlinkDDLDataTypeTest { } private static Map<String, Object> buildDefaultOptions() { - final Map<String, Object> m = new HashMap<>(); - m.put("quoting", Quoting.BACK_TICK); - m.put("quotedCasing", Casing.UNCHANGED); - m.put("unquotedCasing", Casing.UNCHANGED); - m.put("caseSensitive", true); - m.put("enableTypeCoercion", false); - m.put("conformance", SqlConformanceEnum.DEFAULT); - m.put("operatorTable", SqlStdOperatorTable.instance()); - m.put("parserFactory", FlinkSqlParserImpl.FACTORY); - return Collections.unmodifiableMap(m); + return Map.ofEntries( + Map.entry("quoting", Quoting.BACK_TICK), + Map.entry("quotedCasing", Casing.UNCHANGED), + Map.entry("unquotedCasing", Casing.UNCHANGED), + Map.entry("caseSensitive", true), + Map.entry("enableTypeCoercion", false), + Map.entry("conformance", SqlConformanceEnum.DEFAULT), + Map.entry("operatorTable", SqlStdOperatorTable.instance()), + Map.entry("parserFactory", FlinkSqlParserImpl.FACTORY)); } } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java index df43998f94b..3c59b56162f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java @@ -157,6 +157,19 @@ public class TableConfigOptions { + "By default, all top-level columns of the table's " + "schema are selected and nested fields are retained."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) + public static final ConfigOption<Boolean> LEGACY_NESTED_ROW_NULLABILITY = + key("table.legacy-nested-row-nullability") + .booleanType() + .defaultValue(false) + .withDescription( + "Before Flink 2.2, row types defined in SQL " + + "e.g. `SELECT CAST(f AS ROW<i NOT NULL>)` did ignore the `NOT NULL` constraint. " + + "This was more aligned with the SQL standard but caused many type inconsistencies " + + "and cryptic error message when working on nested data. " + + "For example, it prevented using rows in computed columns or join keys. " + + "The new behavior takes the nullability into consideration."); + // ------------------------------------------------------------------------------------------ // Options for plan handling // ------------------------------------------------------------------------------------------ diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/type/RelDataTypeFactoryImpl.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/type/RelDataTypeFactoryImpl.java new file mode 100644 index 00000000000..33c664e7d70 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/rel/type/RelDataTypeFactoryImpl.java @@ -0,0 +1,732 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.rel.type; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import org.apache.calcite.linq4j.tree.Primitive; +import org.apache.calcite.sql.SqlCollation; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.sql.type.ArraySqlType; +import org.apache.calcite.sql.type.JavaToSqlTypeConversionRules; +import org.apache.calcite.sql.type.MapSqlType; +import org.apache.calcite.sql.type.MultisetSqlType; +import org.apache.calcite.sql.type.SqlTypeFamily; +import org.apache.calcite.sql.type.SqlTypeMappingRule; +import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.sql.type.SqlTypeUtil; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.lang.reflect.Field; +import java.nio.charset.Charset; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; +import static org.apache.calcite.util.ReflectUtil.isStatic; + +/** + * Abstract base for implementations of {@link RelDataTypeFactory}. + * + * <p>FLINK modifications are at lines + * + * <ol> + * <li>Should be removed after fixing CALCITE-5199: Lines 242-244 + * </ol> + */ +public abstract class RelDataTypeFactoryImpl implements RelDataTypeFactory { + // ~ Instance fields -------------------------------------------------------- + + /** Global cache for Key to RelDataType. Uses soft values to allow GC. */ + private static final LoadingCache<Key, RelDataType> KEY2TYPE_CACHE = + CacheBuilder.newBuilder() + .softValues() + .build(CacheLoader.from(RelDataTypeFactoryImpl::keyToType)); + + /** Global cache for RelDataType. */ + @SuppressWarnings("BetaApi") + private static final Interner<RelDataType> DATATYPE_CACHE = Interners.newWeakInterner(); + + private static RelDataType keyToType(Key key) { + final ImmutableList.Builder<RelDataTypeField> list = ImmutableList.builder(); + for (int i = 0; i < key.names.size(); i++) { + list.add(new RelDataTypeFieldImpl(key.names.get(i), i, key.types.get(i))); + } + return new RelRecordType(key.kind, list.build(), key.nullable); + } + + private static final Map<Class, RelDataTypeFamily> CLASS_FAMILIES = + ImmutableMap.<Class, RelDataTypeFamily>builder() + .put(String.class, SqlTypeFamily.CHARACTER) + .put(byte[].class, SqlTypeFamily.BINARY) + .put(boolean.class, SqlTypeFamily.BOOLEAN) + .put(Boolean.class, SqlTypeFamily.BOOLEAN) + .put(char.class, SqlTypeFamily.NUMERIC) + .put(Character.class, SqlTypeFamily.NUMERIC) + .put(short.class, SqlTypeFamily.NUMERIC) + .put(Short.class, SqlTypeFamily.NUMERIC) + .put(int.class, SqlTypeFamily.NUMERIC) + .put(Integer.class, SqlTypeFamily.NUMERIC) + .put(long.class, SqlTypeFamily.NUMERIC) + .put(Long.class, SqlTypeFamily.NUMERIC) + .put(float.class, SqlTypeFamily.APPROXIMATE_NUMERIC) + .put(Float.class, SqlTypeFamily.APPROXIMATE_NUMERIC) + .put(double.class, SqlTypeFamily.APPROXIMATE_NUMERIC) + .put(Double.class, SqlTypeFamily.APPROXIMATE_NUMERIC) + .put(java.sql.Date.class, SqlTypeFamily.DATE) + .put(Time.class, SqlTypeFamily.TIME) + .put(Timestamp.class, SqlTypeFamily.TIMESTAMP) + .build(); + + protected final RelDataTypeSystem typeSystem; + + // ~ Constructors ----------------------------------------------------------- + + /** Creates a type factory. */ + protected RelDataTypeFactoryImpl(RelDataTypeSystem typeSystem) { + this.typeSystem = requireNonNull(typeSystem, "typeSystem"); + } + + // ~ Methods ---------------------------------------------------------------- + + @Override + public RelDataTypeSystem getTypeSystem() { + return typeSystem; + } + + // implement RelDataTypeFactory + @Override + public RelDataType createJavaType(Class clazz) { + final JavaType javaType = + clazz == String.class + ? new JavaType(clazz, true, getDefaultCharset(), SqlCollation.IMPLICIT) + : new JavaType(clazz); + return canonize(javaType); + } + + // implement RelDataTypeFactory + @Override + public RelDataType createJoinType(RelDataType... types) { + assert types != null; + assert types.length >= 1; + final List<RelDataType> flattenedTypes = new ArrayList<>(); + getTypeList(ImmutableList.copyOf(types), flattenedTypes); + return canonize(new RelCrossType(flattenedTypes, getFieldList(flattenedTypes))); + } + + @Override + public RelDataType createStructType( + final List<RelDataType> typeList, final List<String> fieldNameList) { + return createStructType(StructKind.FULLY_QUALIFIED, typeList, fieldNameList); + } + + @Override + public RelDataType createStructType( + StructKind kind, final List<RelDataType> typeList, final List<String> fieldNameList) { + return createStructType(kind, typeList, fieldNameList, false); + } + + private RelDataType createStructType( + StructKind kind, + final List<RelDataType> typeList, + final List<String> fieldNameList, + final boolean nullable) { + assert typeList.size() == fieldNameList.size(); + return canonize(kind, fieldNameList, typeList, nullable); + } + + @SuppressWarnings("deprecation") + @Override + public RelDataType createStructType(final RelDataTypeFactory.FieldInfo fieldInfo) { + return canonize( + StructKind.FULLY_QUALIFIED, + new AbstractList<String>() { + @Override + public String get(int index) { + return fieldInfo.getFieldName(index); + } + + @Override + public int size() { + return fieldInfo.getFieldCount(); + } + }, + new AbstractList<RelDataType>() { + @Override + public RelDataType get(int index) { + return fieldInfo.getFieldType(index); + } + + @Override + public int size() { + return fieldInfo.getFieldCount(); + } + }); + } + + @Override + public final RelDataType createStructType( + final List<? extends Map.Entry<String, RelDataType>> fieldList) { + return createStructType(fieldList, false); + } + + private RelDataType createStructType( + final List<? extends Map.Entry<String, RelDataType>> fieldList, boolean nullable) { + return canonize( + StructKind.FULLY_QUALIFIED, Pair.left(fieldList), Pair.right(fieldList), nullable); + } + + @Override + public @Nullable RelDataType leastRestrictive( + List<RelDataType> types, SqlTypeMappingRule mappingRule) { + requireNonNull(types, "types"); + requireNonNull(mappingRule, "mappingRule"); + checkArgument(types.size() >= 1, "types.size >= 1"); + RelDataType type0 = types.get(0); + if (type0.isStruct()) { + return leastRestrictiveStructuredType(types); + } + return null; + } + + protected @Nullable RelDataType leastRestrictiveStructuredType(final List<RelDataType> types) { + final RelDataType type0 = types.get(0); + // precheck that fieldCount is present + if (!type0.isStruct()) { + return null; + } + final int fieldCount = type0.getFieldCount(); + + // precheck that all types are structs with same number of fields + // and register desired nullability for the result + boolean isNullable = false; + for (RelDataType type : types) { + if (!type.isStruct()) { + return null; + } + if (type.getFieldList().size() != fieldCount) { + return null; + } + isNullable |= type.isNullable(); + } + + // recursively compute column-wise least restrictive + // preserve the struct kind from type0 + // FLINK MODIFICATION BEGIN + final Builder builder = builder().kind(type0.getStructKind()); + // FLINK MODIFICATION END + for (int j = 0; j < fieldCount; ++j) { + // REVIEW jvs 22-Jan-2004: Always use the field name from the + // first type? + final int k = j; + + RelDataType type = + leastRestrictive(Util.transform(types, t -> t.getFieldList().get(k).getType())); + if (type == null) { + return null; + } + builder.add(type0.getFieldList().get(j).getName(), type); + } + return createTypeWithNullability(builder.build(), isNullable); + } + + protected @Nullable RelDataType leastRestrictiveArrayMultisetType( + final List<RelDataType> types, SqlTypeName sqlTypeName) { + assert sqlTypeName == SqlTypeName.ARRAY || sqlTypeName == SqlTypeName.MULTISET; + boolean isNullable = false; + for (RelDataType type : types) { + if (type.getComponentType() == null) { + return null; + } + isNullable |= type.isNullable(); + } + final RelDataType type = + leastRestrictive( + Util.transform( + types, + t -> + t instanceof ArraySqlType + ? ((ArraySqlType) t).getComponentType() + : ((MultisetSqlType) t).getComponentType())); + if (type == null) { + return null; + } + return sqlTypeName == SqlTypeName.ARRAY + ? new ArraySqlType(type, isNullable) + : new MultisetSqlType(type, isNullable); + } + + protected @Nullable RelDataType leastRestrictiveMapType( + final List<RelDataType> types, SqlTypeName sqlTypeName) { + assert sqlTypeName == SqlTypeName.MAP; + boolean isNullable = false; + for (RelDataType type : types) { + if (!(type instanceof MapSqlType)) { + return null; + } + isNullable |= type.isNullable(); + } + final RelDataType keyType = + leastRestrictive(Util.transform(types, t -> ((MapSqlType) t).getKeyType())); + if (keyType == null) { + return null; + } + final RelDataType valueType = + leastRestrictive(Util.transform(types, t -> ((MapSqlType) t).getValueType())); + if (valueType == null) { + return null; + } + return new MapSqlType(keyType, valueType, isNullable); + } + + protected RelDataType leastRestrictiveIntervalDatetimeType( + final RelDataType dateTimeType, final RelDataType type1) { + assert SqlTypeUtil.isDatetime(dateTimeType); + if (SqlTypeUtil.isIntType(type1)) { + return dateTimeType; + } + final SqlIntervalQualifier intervalQualifier = type1.getIntervalQualifier(); + requireNonNull(intervalQualifier, "intervalQualifier"); + if (!dateTimeType.getSqlTypeName().allowsPrec() + || intervalQualifier.useDefaultFractionalSecondPrecision() + || intervalQualifier.getFractionalSecondPrecision(typeSystem) + <= dateTimeType.getPrecision()) { + return dateTimeType; + } else { + return createSqlType( + dateTimeType.getSqlTypeName(), + intervalQualifier.getFractionalSecondPrecision(typeSystem)); + } + } + + // copy a non-record type, setting nullability + private RelDataType copySimpleType(RelDataType type, boolean nullable) { + if (type instanceof JavaType) { + JavaType javaType = (JavaType) type; + if (SqlTypeUtil.inCharFamily(javaType)) { + return new JavaType(javaType.clazz, nullable, javaType.charset, javaType.collation); + } else { + return new JavaType( + nullable ? Primitive.box(javaType.clazz) : Primitive.unbox(javaType.clazz), + nullable); + } + } else { + // REVIEW: RelCrossType if it stays around; otherwise get rid of + // this comment + return type; + } + } + + // recursively copy a record type + private RelDataType copyRecordType( + final RelRecordType type, final boolean ignoreNullable, final boolean nullable) { + // For flattening and outer joins, it is desirable to change + // the nullability of the individual fields. + return createStructType( + type.getStructKind(), + new AbstractList<RelDataType>() { + @Override + public RelDataType get(int index) { + RelDataType fieldType = type.getFieldList().get(index).getType(); + if (ignoreNullable) { + return copyType(fieldType); + } else { + return createTypeWithNullability(fieldType, nullable); + } + } + + @Override + public int size() { + return type.getFieldCount(); + } + }, + type.getFieldNames(), + nullable); + } + + // implement RelDataTypeFactory + @Override + public RelDataType copyType(RelDataType type) { + return createTypeWithNullability(type, type.isNullable()); + } + + // implement RelDataTypeFactory + @Override + public RelDataType createTypeWithNullability(final RelDataType type, final boolean nullable) { + requireNonNull(type, "type"); + RelDataType newType; + if (type.isNullable() == nullable) { + newType = type; + } else if (type instanceof RelRecordType) { + // REVIEW: angel 18-Aug-2005 dtbug 336 workaround + // Changed to ignore nullable parameter if nullable is false since + // copyRecordType implementation is doubtful + // - If nullable -> Do a deep copy, setting all fields of the record type + // to be nullable regardless of initial nullability. + // - If not nullable -> Do a deep copy, setting not nullable at top RelRecordType + // level only, keeping its fields' nullability as before. + // According to the SQL standard, nullability for struct types can be defined only for + // columns, which translates to top level structs. Nested struct attributes are always + // nullable, so in principle we could always set the nested attributes to be nullable. + // However, this might create regressions so we will not do it and we will keep previous + // behavior. + newType = copyRecordType((RelRecordType) type, !nullable, nullable); + } else { + newType = copySimpleType(type, nullable); + } + return canonize(newType); + } + + /** + * Registers a type, or returns the existing type if it is already registered. + * + * @throws NullPointerException if type is null + */ + @SuppressWarnings("BetaApi") + protected RelDataType canonize(final RelDataType type) { + return DATATYPE_CACHE.intern(type); + } + + /** + * Looks up a type using a temporary key, and if not present, creates a permanent key and type. + * + * <p>This approach allows us to use a cheap temporary key. A permanent key is more expensive, + * because it must be immutable and not hold references into other data structures. + */ + protected RelDataType canonize( + final StructKind kind, + final List<String> names, + final List<RelDataType> types, + final boolean nullable) { + final RelDataType type = KEY2TYPE_CACHE.getIfPresent(new Key(kind, names, types, nullable)); + if (type != null) { + return type; + } + final ImmutableList<String> names2 = ImmutableList.copyOf(names); + final ImmutableList<RelDataType> types2 = ImmutableList.copyOf(types); + return KEY2TYPE_CACHE.getUnchecked(new Key(kind, names2, types2, nullable)); + } + + protected RelDataType canonize( + final StructKind kind, final List<String> names, final List<RelDataType> types) { + return canonize(kind, names, types, false); + } + + /** Returns a list of the fields in a list of types. */ + private static List<RelDataTypeField> getFieldList(List<RelDataType> types) { + final List<RelDataTypeField> fieldList = new ArrayList<>(); + for (RelDataType type : types) { + addFields(type, fieldList); + } + return fieldList; + } + + /** Returns a list of all atomic types in a list. */ + private static void getTypeList(List<RelDataType> inTypes, List<RelDataType> flatTypes) { + for (RelDataType inType : inTypes) { + if (inType instanceof RelCrossType) { + getTypeList(((RelCrossType) inType).getTypes(), flatTypes); + } else { + flatTypes.add(inType); + } + } + } + + /** + * Adds all fields in <code>type</code> to <code>fieldList</code>, renumbering the fields (if + * necessary) to ensure that their index matches their position in the list. + */ + private static void addFields(RelDataType type, List<RelDataTypeField> fieldList) { + if (type instanceof RelCrossType) { + final RelCrossType crossType = (RelCrossType) type; + for (RelDataType type1 : crossType.getTypes()) { + addFields(type1, fieldList); + } + } else { + List<RelDataTypeField> fields = type.getFieldList(); + for (RelDataTypeField field : fields) { + if (field.getIndex() != fieldList.size()) { + field = + new RelDataTypeFieldImpl( + field.getName(), fieldList.size(), field.getType()); + } + fieldList.add(field); + } + } + } + + public static boolean isJavaType(RelDataType t) { + return t instanceof JavaType; + } + + private @Nullable List<RelDataTypeFieldImpl> fieldsOf(Class clazz) { + final List<RelDataTypeFieldImpl> list = new ArrayList<>(); + for (Field field : clazz.getFields()) { + if (isStatic(field)) { + continue; + } + list.add( + new RelDataTypeFieldImpl( + field.getName(), list.size(), createJavaType(field.getType()))); + } + + if (list.isEmpty()) { + return null; + } + + return list; + } + + /** + * Delegates to {@link RelDataTypeSystem#deriveDecimalMultiplyType(RelDataTypeFactory, + * RelDataType, RelDataType)} to get the return type for the operation. + */ + @Deprecated + @Override + public @Nullable RelDataType createDecimalProduct(RelDataType type1, RelDataType type2) { + return typeSystem.deriveDecimalMultiplyType(this, type1, type2); + } + + /** + * Delegates to {@link RelDataTypeSystem#shouldUseDoubleMultiplication(RelDataTypeFactory, + * RelDataType, RelDataType)} to get if double should be used for multiplication. + */ + @Deprecated + @Override + public boolean useDoubleMultiplication(RelDataType type1, RelDataType type2) { + return typeSystem.shouldUseDoubleMultiplication(this, type1, type2); + } + + /** + * Delegates to {@link RelDataTypeSystem#deriveDecimalDivideType(RelDataTypeFactory, + * RelDataType, RelDataType)} to get the return type for the operation. + */ + @Deprecated + @Override + public @Nullable RelDataType createDecimalQuotient(RelDataType type1, RelDataType type2) { + return typeSystem.deriveDecimalDivideType(this, type1, type2); + } + + @Override + public RelDataType decimalOf(RelDataType type) { + // create decimal type and sync nullability + return createTypeWithNullability(decimalOf2(type), type.isNullable()); + } + + /** Create decimal type equivalent with the given {@code type} while sans nullability. */ + private RelDataType decimalOf2(RelDataType type) { + assert SqlTypeUtil.isNumeric(type) || SqlTypeUtil.isNull(type); + SqlTypeName typeName = type.getSqlTypeName(); + assert typeName != null; + switch (typeName) { + case DECIMAL: + // Fix the precision when the type is JavaType. + return RelDataTypeFactoryImpl.isJavaType(type) + ? SqlTypeUtil.getMaxPrecisionScaleDecimal(this) + : type; + case TINYINT: + return createSqlType(SqlTypeName.DECIMAL, 3, 0); + case SMALLINT: + return createSqlType(SqlTypeName.DECIMAL, 5, 0); + case INTEGER: + return createSqlType(SqlTypeName.DECIMAL, 10, 0); + case BIGINT: + // the default max precision is 19, so this is actually DECIMAL(19, 0) + // but derived system can override the max precision/scale. + return createSqlType(SqlTypeName.DECIMAL, 38, 0); + case REAL: + return createSqlType(SqlTypeName.DECIMAL, 14, 7); + case FLOAT: + return createSqlType(SqlTypeName.DECIMAL, 14, 7); + case DOUBLE: + // the default max precision is 19, so this is actually DECIMAL(19, 15) + // but derived system can override the max precision/scale. + return createSqlType(SqlTypeName.DECIMAL, 30, 15); + default: + // default precision and scale. + return createSqlType(SqlTypeName.DECIMAL); + } + } + + @Override + public Charset getDefaultCharset() { + return Util.getDefaultCharset(); + } + + @SuppressWarnings("deprecation") + @Override + public FieldInfoBuilder builder() { + return new FieldInfoBuilder(this); + } + + // ~ Inner Classes ---------------------------------------------------------- + + // TODO jvs 13-Dec-2004: move to OJTypeFactoryImpl? + + /** Type which is based upon a Java class. */ + public class JavaType extends RelDataTypeImpl { + private final Class clazz; + private final boolean nullable; + private @Nullable SqlCollation collation; + private @Nullable Charset charset; + + public JavaType(Class clazz) { + this(clazz, !clazz.isPrimitive()); + } + + public JavaType(Class clazz, boolean nullable) { + this(clazz, nullable, null, null); + } + + @SuppressWarnings("argument.type.incompatible") + public JavaType( + Class clazz, + boolean nullable, + @Nullable Charset charset, + @Nullable SqlCollation collation) { + super(fieldsOf(clazz)); + this.clazz = clazz; + this.nullable = nullable; + assert (charset != null) == SqlTypeUtil.inCharFamily(this) : "Need to be a chartype"; + this.charset = charset; + this.collation = collation; + computeDigest(); + } + + public Class getJavaClass() { + return clazz; + } + + @Override + public boolean isNullable() { + return nullable; + } + + @Override + public RelDataTypeFamily getFamily() { + RelDataTypeFamily family = CLASS_FAMILIES.get(clazz); + return family != null ? family : this; + } + + @Override + protected void generateTypeString(StringBuilder sb, boolean withDetail) { + sb.append("JavaType("); + sb.append(clazz); + sb.append(")"); + } + + @Override + public @Nullable RelDataType getComponentType() { + final Class componentType = clazz.getComponentType(); + if (componentType == null) { + return null; + } else { + return createJavaType(componentType); + } + } + + /** + * For {@link JavaType} created with {@link Map} class, we cannot get the key type. Use ANY + * as key type. + */ + @Override + public @Nullable RelDataType getKeyType() { + if (Map.class.isAssignableFrom(clazz)) { + // Need to return a SQL type because the type inference needs SqlTypeName. + return createSqlType(SqlTypeName.ANY); + } else { + return null; + } + } + + /** + * For {@link JavaType} created with {@link Map} class, we cannot get the value type. Use + * ANY as value type. + */ + @Override + public @Nullable RelDataType getValueType() { + if (Map.class.isAssignableFrom(clazz)) { + // Need to return a SQL type because the type inference needs SqlTypeName. + return createSqlType(SqlTypeName.ANY); + } else { + return null; + } + } + + @Override + public @Nullable Charset getCharset() { + return this.charset; + } + + @Override + public @Nullable SqlCollation getCollation() { + return this.collation; + } + + @Override + public SqlTypeName getSqlTypeName() { + final SqlTypeName typeName = JavaToSqlTypeConversionRules.instance().lookup(clazz); + if (typeName == null) { + return SqlTypeName.OTHER; + } + return typeName; + } + } + + /** Key to the data type cache. */ + private static class Key { + private final StructKind kind; + private final List<String> names; + private final List<RelDataType> types; + private final boolean nullable; + + Key(StructKind kind, List<String> names, List<RelDataType> types, boolean nullable) { + this.kind = kind; + this.names = names; + this.types = types; + this.nullable = nullable; + } + + @Override + public int hashCode() { + return Objects.hash(kind, names, types, nullable); + } + + @Override + public boolean equals(@Nullable Object obj) { + return obj == this + || obj instanceof Key + && kind == ((Key) obj).kind + && names.equals(((Key) obj).names) + && types.equals(((Key) obj).types) + && nullable == ((Key) obj).nullable; + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/type/SqlTypeUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/type/SqlTypeUtil.java new file mode 100644 index 00000000000..d66dbecebb5 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/type/SqlTypeUtil.java @@ -0,0 +1,1807 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.sql.type; + +import org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFamily; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; +import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.SqlBasicTypeNameSpec; +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlCallBinding; +import org.apache.calcite.sql.SqlCollation; +import org.apache.calcite.sql.SqlCollectionTypeNameSpec; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlMapTypeNameSpec; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlTypeNameSpec; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.sql.validate.SqlNameMatcher; +import org.apache.calcite.sql.validate.SqlValidator; +import org.apache.calcite.sql.validate.SqlValidatorScope; +import org.apache.calcite.sql.validate.SqlValidatorUtil; +import org.apache.calcite.util.NumberUtil; +import org.apache.calcite.util.Pair; +import org.apache.calcite.util.Util; +import org.apiguardian.api.API; +import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf; +import org.checkerframework.checker.nullness.qual.Nullable; + +import java.math.BigDecimal; +import java.nio.charset.Charset; +import java.util.AbstractList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; +import static org.apache.calcite.rel.type.RelDataTypeImpl.NON_NULLABLE_SUFFIX; +import static org.apache.calcite.sql.type.NonNullableAccessors.getCharset; +import static org.apache.calcite.sql.type.NonNullableAccessors.getCollation; +import static org.apache.calcite.sql.type.NonNullableAccessors.getComponentTypeOrThrow; +import static org.apache.calcite.util.Static.RESOURCE; + +/** + * Contains utility methods used during SQL validation or type derivation. + * + * <p>FLINK modifications are at lines + * + * <ol> + * <li>We should use ExtendedSqlRowTypeNameSpec for rows: Lines 1102-1106 + * <li>Should be removed after fixing CALCITE-7062: Lines 1126-1128 + * </ol> + */ +public abstract class SqlTypeUtil { + // ~ Methods ---------------------------------------------------------------- + + /** + * Checks whether two types or more are char comparable. + * + * @return Returns true if all operands are of char type and if they are comparable, i.e. of the + * same charset and collation of same charset + */ + public static boolean isCharTypeComparable(List<RelDataType> argTypes) { + assert argTypes != null; + assert argTypes.size() >= 2; + + // Filter out ANY and NULL elements. + List<RelDataType> argTypes2 = new ArrayList<>(); + for (RelDataType t : argTypes) { + if (!isAny(t) && !isNull(t)) { + argTypes2.add(t); + } + } + + for (Pair<RelDataType, RelDataType> pair : Pair.adjacents(argTypes2)) { + RelDataType t0 = pair.left; + RelDataType t1 = pair.right; + + if (!inCharFamily(t0) || !inCharFamily(t1)) { + return false; + } + + if (!getCharset(t0).equals(getCharset(t1))) { + return false; + } + + if (!getCollation(t0).getCharset().equals(getCollation(t1).getCharset())) { + return false; + } + } + + return true; + } + + /** + * Returns whether the operands to a call are char type-comparable. + * + * @param binding Binding of call to operands + * @param operands Operands to check for compatibility; usually the operands of the bound call, + * but not always + * @param throwOnFailure Whether to throw an exception on failure + * @return whether operands are valid + */ + public static boolean isCharTypeComparable( + SqlCallBinding binding, List<SqlNode> operands, boolean throwOnFailure) { + requireNonNull(operands, "operands"); + assert operands.size() >= 2 + : "operands.size() should be 2 or greater, actual: " + operands.size(); + + if (!isCharTypeComparable(SqlTypeUtil.deriveType(binding, operands))) { + if (throwOnFailure) { + String msg = String.join(", ", Util.transform(operands, String::valueOf)); + throw binding.newError(RESOURCE.operandNotComparable(msg)); + } + return false; + } + return true; + } + + /** + * Derives component type for ARRAY, MULTISET, MAP when input is sub-query. + * + * @param origin original component type + * @return component type + */ + public static RelDataType deriveCollectionQueryComponentType( + SqlTypeName collectionType, RelDataType origin) { + switch (collectionType) { + case ARRAY: + case MULTISET: + return origin.isStruct() && origin.getFieldCount() == 1 + ? origin.getFieldList().get(0).getType() + : origin; + case MAP: + return origin; + default: + throw new AssertionError( + "Impossible to derive component type for " + collectionType); + } + } + + /** Iterates over all operands, derives their types, and collects them into a list. */ + public static List<RelDataType> deriveAndCollectTypes( + SqlValidator validator, SqlValidatorScope scope, List<? extends SqlNode> operands) { + // NOTE: Do not use an AbstractList. Don't want to be lazy. We want + // errors. + List<RelDataType> types = new ArrayList<>(); + for (SqlNode operand : operands) { + types.add(validator.deriveType(scope, operand)); + } + return types; + } + + /** + * Derives type of the call via its binding. + * + * @param binding binding to derive the type from + * @return datatype of the call + */ + @API(since = "1.26", status = API.Status.EXPERIMENTAL) + public static RelDataType deriveType(SqlCallBinding binding) { + return deriveType(binding, binding.getCall()); + } + + /** + * Derives type of the given call under given binding. + * + * @param binding binding to derive the type from + * @param node node type to derive + * @return datatype of the given node + */ + @API(since = "1.26", status = API.Status.EXPERIMENTAL) + public static RelDataType deriveType(SqlCallBinding binding, SqlNode node) { + return binding.getValidator() + .deriveType(requireNonNull(binding.getScope(), () -> "scope of " + binding), node); + } + + /** + * Derives types for the list of nodes. + * + * @param binding binding to derive the type from + * @param nodes the list of nodes to derive types from + * @return the list of types of the given nodes + */ + @API(since = "1.26", status = API.Status.EXPERIMENTAL) + public static List<RelDataType> deriveType( + SqlCallBinding binding, List<? extends SqlNode> nodes) { + return deriveAndCollectTypes( + binding.getValidator(), + requireNonNull(binding.getScope(), () -> "scope of " + binding), + nodes); + } + + /** + * Promotes a type to a row type (does nothing if it already is one). + * + * @param type type to be promoted + * @param fieldName name to give field in row type; null for default of "ROW_VALUE" + * @return row type + */ + public static RelDataType promoteToRowType( + RelDataTypeFactory typeFactory, RelDataType type, @Nullable String fieldName) { + if (!type.isStruct()) { + if (fieldName == null) { + fieldName = "ROW_VALUE"; + } + type = typeFactory.builder().add(fieldName, type).build(); + } + return type; + } + + /** + * Recreates a given RelDataType with nullability iff any of the operands of a call are + * nullable. + */ + public static RelDataType makeNullableIfOperandsAre( + final SqlValidator validator, + final SqlValidatorScope scope, + final SqlCall call, + RelDataType type) { + for (SqlNode operand : call.getOperandList()) { + RelDataType operandType = validator.deriveType(scope, operand); + + if (containsNullable(operandType)) { + RelDataTypeFactory typeFactory = validator.getTypeFactory(); + type = typeFactory.createTypeWithNullability(type, true); + break; + } + } + return type; + } + + /** + * Recreates a given RelDataType with nullability iff any of the param argTypes are nullable. + */ + public static RelDataType makeNullableIfOperandsAre( + final RelDataTypeFactory typeFactory, + final List<RelDataType> argTypes, + RelDataType type) { + requireNonNull(type, "type"); + if (containsNullable(argTypes)) { + type = typeFactory.createTypeWithNullability(type, true); + } + return type; + } + + /** Returns whether all of array of types are nullable. */ + public static boolean allNullable(List<RelDataType> types) { + for (RelDataType type : types) { + if (!containsNullable(type)) { + return false; + } + } + return true; + } + + /** Returns whether one or more of an array of types is nullable. */ + public static boolean containsNullable(List<RelDataType> types) { + for (RelDataType type : types) { + if (containsNullable(type)) { + return true; + } + } + return false; + } + + /** Determines whether a type or any of its fields (if a structured type) are nullable. */ + public static boolean containsNullable(RelDataType type) { + if (type.isNullable()) { + return true; + } + if (!type.isStruct()) { + return false; + } + for (RelDataTypeField field : type.getFieldList()) { + if (containsNullable(field.getType())) { + return true; + } + } + return false; + } + + /** + * Creates a RelDataType having the same type of the sourceRelDataType, and the same nullability + * as the targetRelDataType. + */ + public static RelDataType keepSourceTypeAndTargetNullability( + RelDataType sourceRelDataType, + RelDataType targetRelDataType, + RelDataTypeFactory typeFactory) { + if (!targetRelDataType.isStruct()) { + return typeFactory.createTypeWithNullability( + sourceRelDataType, targetRelDataType.isNullable()); + } + List<RelDataTypeField> targetFields = targetRelDataType.getFieldList(); + List<RelDataTypeField> sourceFields = sourceRelDataType.getFieldList(); + ImmutableList.Builder<RelDataTypeField> newTargetField = ImmutableList.builder(); + for (int i = 0; i < targetRelDataType.getFieldCount(); i++) { + RelDataTypeField targetField = targetFields.get(i); + RelDataTypeField sourceField = sourceFields.get(i); + newTargetField.add( + new RelDataTypeFieldImpl( + sourceField.getName(), + sourceField.getIndex(), + keepSourceTypeAndTargetNullability( + sourceField.getType(), targetField.getType(), typeFactory))); + } + RelDataType relDataType = typeFactory.createStructType(newTargetField.build()); + return typeFactory.createTypeWithNullability(relDataType, targetRelDataType.isNullable()); + } + + /** + * Returns typeName.equals(type.getSqlTypeName()). If typeName.equals(SqlTypeName.Any) true is + * always returned. + */ + public static boolean isOfSameTypeName(SqlTypeName typeName, RelDataType type) { + return SqlTypeName.ANY == typeName || typeName == type.getSqlTypeName(); + } + + /** + * Returns true if any element in <code>typeNames</code> matches type.getSqlTypeName(). + * + * @see #isOfSameTypeName(SqlTypeName, RelDataType) + */ + public static boolean isOfSameTypeName(Collection<SqlTypeName> typeNames, RelDataType type) { + for (SqlTypeName typeName : typeNames) { + if (isOfSameTypeName(typeName, type)) { + return true; + } + } + return false; + } + + /** Returns whether a type is DATE, TIME, or TIMESTAMP. */ + public static boolean isDatetime(RelDataType type) { + return SqlTypeFamily.DATETIME.contains(type); + } + + /** Returns whether a type is DATE. */ + public static boolean isDate(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + + return type.getSqlTypeName() == SqlTypeName.DATE; + } + + /** Returns whether a type is TIMESTAMP. */ + public static boolean isTimestamp(RelDataType type) { + return SqlTypeFamily.TIMESTAMP.contains(type); + } + + /** Returns whether a type is some kind of INTERVAL. */ + @SuppressWarnings("contracts.conditional.postcondition.not.satisfied") + @EnsuresNonNullIf(expression = "#1.getIntervalQualifier()", result = true) + public static boolean isInterval(RelDataType type) { + return SqlTypeFamily.DATETIME_INTERVAL.contains(type); + } + + /** Returns whether a type is in SqlTypeFamily.Character. */ + @SuppressWarnings("contracts.conditional.postcondition.not.satisfied") + @EnsuresNonNullIf(expression = "#1.getCharset()", result = true) + @EnsuresNonNullIf(expression = "#1.getCollation()", result = true) + public static boolean inCharFamily(RelDataType type) { + return type.getFamily() == SqlTypeFamily.CHARACTER; + } + + /** Returns whether a type name is in SqlTypeFamily.Character. */ + public static boolean inCharFamily(SqlTypeName typeName) { + return typeName.getFamily() == SqlTypeFamily.CHARACTER; + } + + /** Returns whether a type is in SqlTypeFamily.Boolean. */ + public static boolean inBooleanFamily(RelDataType type) { + return type.getFamily() == SqlTypeFamily.BOOLEAN; + } + + /** Returns whether two types are in same type family. */ + public static boolean inSameFamily(RelDataType t1, RelDataType t2) { + return t1.getFamily() == t2.getFamily(); + } + + /** + * Returns whether two types are in same type family, or one or the other is of type {@link + * SqlTypeName#NULL}. + */ + public static boolean inSameFamilyOrNull(RelDataType t1, RelDataType t2) { + return (t1.getSqlTypeName() == SqlTypeName.NULL) + || (t2.getSqlTypeName() == SqlTypeName.NULL) + || (t1.getFamily() == t2.getFamily()); + } + + /** Returns whether a type family is either character or binary. */ + public static boolean inCharOrBinaryFamilies(RelDataType type) { + return (type.getFamily() == SqlTypeFamily.CHARACTER) + || (type.getFamily() == SqlTypeFamily.BINARY); + } + + /** Returns whether a type is a LOB of some kind. */ + public static boolean isLob(RelDataType type) { + // TODO jvs 9-Dec-2004: once we support LOB types + return false; + } + + /** Returns whether a type is variable width with bounded precision. */ + public static boolean isBoundedVariableWidth(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + switch (typeName) { + case VARCHAR: + case VARBINARY: + + // TODO angel 8-June-2005: Multiset should be LOB + case MULTISET: + return true; + default: + return false; + } + } + + /** Returns whether a type is one of the integer types. */ + public static boolean isIntType(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + switch (typeName) { + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + return true; + default: + return false; + } + } + + /** Returns whether a type is DECIMAL. */ + public static boolean isDecimal(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return typeName == SqlTypeName.DECIMAL; + } + + /** Returns whether a type is DOUBLE. */ + public static boolean isDouble(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return typeName == SqlTypeName.DOUBLE; + } + + /** Returns whether a type is BIGINT. */ + public static boolean isBigint(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return typeName == SqlTypeName.BIGINT; + } + + /** Returns whether a type is numeric with exact precision. */ + public static boolean isExactNumeric(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + switch (typeName) { + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case DECIMAL: + return true; + default: + return false; + } + } + + /** Returns whether a type's scale is set. */ + public static boolean hasScale(RelDataType type) { + return type.getScale() != Integer.MIN_VALUE; + } + + /** Returns the maximum value of an integral type, as a long value. */ + public static long maxValue(RelDataType type) { + assert SqlTypeUtil.isIntType(type); + switch (type.getSqlTypeName()) { + case TINYINT: + return Byte.MAX_VALUE; + case SMALLINT: + return Short.MAX_VALUE; + case INTEGER: + return Integer.MAX_VALUE; + case BIGINT: + return Long.MAX_VALUE; + default: + throw Util.unexpected(type.getSqlTypeName()); + } + } + + /** Returns whether a type is numeric with approximate precision. */ + public static boolean isApproximateNumeric(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + switch (typeName) { + case FLOAT: + case REAL: + case DOUBLE: + return true; + default: + return false; + } + } + + /** Returns whether a type is numeric. */ + public static boolean isNumeric(RelDataType type) { + return isExactNumeric(type) || isApproximateNumeric(type); + } + + /** Returns whether a type is the NULL type. */ + public static boolean isNull(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return typeName == SqlTypeName.NULL; + } + + /** + * Tests whether two types have the same name and structure, possibly with differing modifiers. + * For example, VARCHAR(1) and VARCHAR(10) are considered the same, while VARCHAR(1) and CHAR(1) + * are considered different. Likewise, VARCHAR(1) MULTISET and VARCHAR(10) MULTISET are + * considered the same. + * + * @return true if types have same name and structure + */ + public static boolean sameNamedType(RelDataType t1, RelDataType t2) { + if (t1.isStruct() || t2.isStruct()) { + if (!t1.isStruct() || !t2.isStruct()) { + return false; + } + if (t1.getFieldCount() != t2.getFieldCount()) { + return false; + } + List<RelDataTypeField> fields1 = t1.getFieldList(); + List<RelDataTypeField> fields2 = t2.getFieldList(); + for (int i = 0; i < fields1.size(); ++i) { + if (!sameNamedType(fields1.get(i).getType(), fields2.get(i).getType())) { + return false; + } + } + return true; + } + RelDataType comp1 = t1.getComponentType(); + RelDataType comp2 = t2.getComponentType(); + if ((comp1 != null) || (comp2 != null)) { + if ((comp1 == null) || (comp2 == null)) { + return false; + } + if (!sameNamedType(comp1, comp2)) { + return false; + } + } + return t1.getSqlTypeName() == t2.getSqlTypeName(); + } + + /** + * Computes the maximum number of bytes required to represent a value of a type having + * user-defined precision. This computation assumes no overhead such as length indicators and + * NUL-terminators. Complex types for which multiple representations are possible (e.g. DECIMAL + * or TIMESTAMP) return 0. + * + * @param type type for which to compute storage + * @return maximum bytes, or 0 for a fixed-width type or type with unknown maximum + */ + public static int getMaxByteSize(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + + if (typeName == null) { + return 0; + } + + switch (typeName) { + case CHAR: + case VARCHAR: + return (int) + Math.ceil( + ((double) type.getPrecision()) + * getCharset(type).newEncoder().maxBytesPerChar()); + + case BINARY: + case VARBINARY: + return type.getPrecision(); + + case MULTISET: + + // TODO Wael Jan-24-2005: Need a better way to tell fennel this + // number. This a very generic place and implementation details like + // this doesnt belong here. Waiting to change this once we have blob + // support + return 4096; + + default: + return 0; + } + } + + /** + * Returns the minimum unscaled value of a numeric type. + * + * @param type a numeric type + */ + public static long getMinValue(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + switch (typeName) { + case TINYINT: + return Byte.MIN_VALUE; + case SMALLINT: + return Short.MIN_VALUE; + case INTEGER: + return Integer.MIN_VALUE; + case BIGINT: + case DECIMAL: + return NumberUtil.getMinUnscaled(type.getPrecision()).longValue(); + default: + throw new AssertionError("getMinValue(" + typeName + ")"); + } + } + + /** + * Returns the maximum unscaled value of a numeric type. + * + * @param type a numeric type + */ + public static long getMaxValue(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + switch (typeName) { + case TINYINT: + return Byte.MAX_VALUE; + case SMALLINT: + return Short.MAX_VALUE; + case INTEGER: + return Integer.MAX_VALUE; + case BIGINT: + case DECIMAL: + return NumberUtil.getMaxUnscaled(type.getPrecision()).longValue(); + default: + throw new AssertionError("getMaxValue(" + typeName + ")"); + } + } + + /** Returns whether a type has a representation as a Java primitive (ignoring nullability). */ + @Deprecated // to be removed before 2.0 + public static boolean isJavaPrimitive(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + + switch (typeName) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case REAL: + case DOUBLE: + case SYMBOL: + return true; + default: + return false; + } + } + + /** Returns the class name of the wrapper for the primitive data type. */ + @Deprecated // to be removed before 2.0 + public static @Nullable String getPrimitiveWrapperJavaClassName(@Nullable RelDataType type) { + if (type == null) { + return null; + } + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return null; + } + + switch (typeName) { + case BOOLEAN: + return "Boolean"; + default: + //noinspection deprecation + return getNumericJavaClassName(type); + } + } + + /** Returns the class name of a numeric data type. */ + @Deprecated // to be removed before 2.0 + public static @Nullable String getNumericJavaClassName(@Nullable RelDataType type) { + if (type == null) { + return null; + } + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return null; + } + + switch (typeName) { + case TINYINT: + return "Byte"; + case SMALLINT: + return "Short"; + case INTEGER: + return "Integer"; + case BIGINT: + return "Long"; + case REAL: + return "Float"; + case DECIMAL: + case FLOAT: + case DOUBLE: + return "Double"; + default: + return null; + } + } + + private static boolean isAny(RelDataType t) { + return t.getFamily() == SqlTypeFamily.ANY; + } + + public static boolean isMeasure(RelDataType t) { + return t instanceof MeasureSqlType; + } + + /** + * Tests whether a value can be assigned to a site. + * + * @param toType type of the target site + * @param fromType type of the source value + * @return true iff assignable + */ + public static boolean canAssignFrom(RelDataType toType, RelDataType fromType) { + if (isAny(toType) || isAny(fromType)) { + return true; + } + + // TODO jvs 2-Jan-2005: handle all the other cases like + // rows, collections, UDT's + if (fromType.getSqlTypeName() == SqlTypeName.NULL) { + // REVIEW jvs 4-Dec-2008: We allow assignment from NULL to any + // type, including NOT NULL types, since in the case where no + // rows are actually processed, the assignment is legal + // (FRG-365). However, it would be better if the validator's + // NULL type inference guaranteed that we had already + // assigned a real (nullable) type to every NULL literal. + return true; + } + + if (fromType.getSqlTypeName() == SqlTypeName.ARRAY) { + if (toType.getSqlTypeName() != SqlTypeName.ARRAY) { + return false; + } + return canAssignFrom( + getComponentTypeOrThrow(toType), getComponentTypeOrThrow(fromType)); + } + + if (areCharacterSetsMismatched(toType, fromType)) { + return false; + } + + return toType.getFamily() == fromType.getFamily(); + } + + /** + * Determines whether two types both have different character sets. If one or the other type has + * no character set (e.g. in cast from INT to VARCHAR), that is not a mismatch. + * + * @param t1 first type + * @param t2 second type + * @return true iff mismatched + */ + public static boolean areCharacterSetsMismatched(RelDataType t1, RelDataType t2) { + if (isAny(t1) || isAny(t2)) { + return false; + } + + Charset cs1 = t1.getCharset(); + Charset cs2 = t2.getCharset(); + if ((cs1 != null) && (cs2 != null)) { + if (!cs1.equals(cs2)) { + return true; + } + } + return false; + } + + /** + * Compares two types and returns whether {@code fromType} can be cast to {@code toType}, using + * either coercion or assignment. + * + * <p>REVIEW jvs 17-Dec-2004: the coerce param below shouldn't really be necessary. We're using + * it as a hack because {@link SqlTypeFactoryImpl#leastRestrictive} isn't complete enough yet. + * Once it is, this param (and the non-coerce rules of {@link SqlTypeAssignmentRule}) should go + * away. + * + * @param toType target of assignment + * @param fromType source of assignment + * @param coerce if true, the SQL rules for CAST are used; if false, the rules are similar to + * Java; e.g. you can't assign short x = (int) y, and you can't assign int x = (String) z. + * @return whether cast is legal + */ + public static boolean canCastFrom(RelDataType toType, RelDataType fromType, boolean coerce) { + return canCastFrom( + toType, + fromType, + coerce ? SqlTypeCoercionRule.instance() : SqlTypeAssignmentRule.instance()); + } + + /** + * Compares two types and returns whether {@code fromType} can be cast to {@code toType}. + * + * <p>A type mapping rule (i.e. {@link SqlTypeCoercionRule} or {@link SqlTypeAssignmentRule}) + * controls what types are allowed to be cast from/to. + * + * @param toType target of assignment + * @param fromType source of assignment + * @param typeMappingRule SqlTypeMappingRule + * @return whether cast is legal + */ + public static boolean canCastFrom( + RelDataType toType, RelDataType fromType, SqlTypeMappingRule typeMappingRule) { + if (toType.equals(fromType)) { + return true; + } + // If fromType is a measure, you should compare the inner type otherwise it will + // always return TRUE because the SqlTypeFamily of MEASURE is ANY + if (isMeasure(fromType)) { + return canCastFrom( + toType, requireNonNull(fromType.getMeasureElementType()), typeMappingRule); + } + if (isAny(toType) || isAny(fromType)) { + return true; + } + + final SqlTypeName fromTypeName = fromType.getSqlTypeName(); + final SqlTypeName toTypeName = toType.getSqlTypeName(); + if (toTypeName == SqlTypeName.UNKNOWN) { + return true; + } + if (toType.isStruct() || fromType.isStruct()) { + if (toTypeName == SqlTypeName.DISTINCT) { + if (fromTypeName == SqlTypeName.DISTINCT) { + // can't cast between different distinct types + return false; + } + return canCastFrom( + toType.getFieldList().get(0).getType(), fromType, typeMappingRule); + } else if (fromTypeName == SqlTypeName.DISTINCT) { + return canCastFrom( + toType, fromType.getFieldList().get(0).getType(), typeMappingRule); + } else if (toTypeName == SqlTypeName.ROW) { + if (fromTypeName != SqlTypeName.ROW) { + return fromTypeName == SqlTypeName.NULL; + } + int n = toType.getFieldCount(); + if (fromType.getFieldCount() != n) { + return false; + } + for (int i = 0; i < n; ++i) { + RelDataTypeField toField = toType.getFieldList().get(i); + RelDataTypeField fromField = fromType.getFieldList().get(i); + if (!canCastFrom(toField.getType(), fromField.getType(), typeMappingRule)) { + return false; + } + } + return true; + } else if (toTypeName == SqlTypeName.MULTISET) { + if (!fromType.isStruct()) { + return false; + } + if (fromTypeName != SqlTypeName.MULTISET) { + return false; + } + return canCastFrom( + getComponentTypeOrThrow(toType), + getComponentTypeOrThrow(fromType), + typeMappingRule); + } else if (fromTypeName == SqlTypeName.MULTISET) { + return false; + } else { + return toType.getFamily() == fromType.getFamily(); + } + } + RelDataType c1 = toType.getComponentType(); + if (c1 != null) { + RelDataType c2 = fromType.getComponentType(); + if (c2 != null) { + return canCastFrom(c1, c2, typeMappingRule); + } + } + if ((isInterval(fromType) && isExactNumeric(toType)) + || (isInterval(toType) && isExactNumeric(fromType))) { + IntervalSqlType intervalType = + (IntervalSqlType) (isInterval(fromType) ? fromType : toType); + if (!intervalType.getIntervalQualifier().isSingleDatetimeField()) { + // Casts between intervals and exact numerics must involve + // intervals with a single datetime field. + return false; + } + } + if (toTypeName == null || fromTypeName == null) { + return false; + } + return typeMappingRule.canApplyFrom(toTypeName, fromTypeName); + } + + /** + * Flattens a record type by recursively expanding any fields which are themselves record types. + * For each record type, a representative null value field is also prepended (with state NULL + * for a null value and FALSE for non-null), and all component types are asserted to be + * nullable, since SQL doesn't allow NOT NULL to be specified on attributes. + * + * @param typeFactory factory which should produced flattened type + * @param recordType type with possible nesting + * @param flatteningMap if non-null, receives map from unflattened ordinal to flattened ordinal + * (must have length at least recordType.getFieldList().size()) + * @return flattened equivalent + */ + public static RelDataType flattenRecordType( + RelDataTypeFactory typeFactory, RelDataType recordType, int[] flatteningMap) { + if (!recordType.isStruct()) { + return recordType; + } + List<RelDataTypeField> fieldList = new ArrayList<>(); + boolean nested = flattenFields(typeFactory, recordType, fieldList, flatteningMap); + if (!nested) { + return recordType; + } + List<RelDataType> types = new ArrayList<>(); + List<String> fieldNames = new ArrayList<>(); + Map<String, Long> fieldCnt = + fieldList.stream() + .map(RelDataTypeField::getName) + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + int i = -1; + for (RelDataTypeField field : fieldList) { + ++i; + types.add(field.getType()); + String oriFieldName = field.getName(); + // Patch up the field name with index if there are duplicates. + // There is still possibility that the patched name conflicts with existing ones, + // but that should be rare case. + Long fieldCount = fieldCnt.get(oriFieldName); + String fieldName = + fieldCount != null && fieldCount > 1 ? oriFieldName + "_" + i : oriFieldName; + fieldNames.add(fieldName); + } + return typeFactory.createStructType(types, fieldNames); + } + + public static boolean needsNullIndicator(RelDataType recordType) { + // NOTE jvs 9-Mar-2005: It would be more storage-efficient to say that + // no null indicator is required for structured type columns declared + // as NOT NULL. However, the uniformity of always having a null + // indicator makes things cleaner in many places. + return recordType.getSqlTypeName() == SqlTypeName.STRUCTURED; + } + + private static boolean flattenFields( + RelDataTypeFactory typeFactory, + RelDataType type, + List<RelDataTypeField> list, + int[] flatteningMap) { + boolean nested = false; + for (RelDataTypeField field : type.getFieldList()) { + if (flatteningMap != null) { + flatteningMap[field.getIndex()] = list.size(); + } + if (field.getType().isStruct()) { + nested = true; + flattenFields(typeFactory, field.getType(), list, null); + } else if (field.getType().getComponentType() != null) { + nested = true; + + // TODO jvs 14-Feb-2005: generalize to any kind of + // collection type + RelDataType flattenedCollectionType = + typeFactory.createMultisetType( + flattenRecordType( + typeFactory, + getComponentTypeOrThrow(field.getType()), + null), + -1); + if (field.getType() instanceof ArraySqlType) { + flattenedCollectionType = + typeFactory.createArrayType( + flattenRecordType( + typeFactory, + getComponentTypeOrThrow(field.getType()), + null), + -1); + } + field = + new RelDataTypeFieldImpl( + field.getName(), field.getIndex(), flattenedCollectionType); + list.add(field); + } else { + list.add(field); + } + } + return nested; + } + + /** + * Converts an instance of RelDataType to an instance of SqlDataTypeSpec. + * + * @param type type descriptor + * @param charSetName charSet name + * @param maxPrecision The max allowed precision. + * @param maxScale max allowed scale + * @return corresponding parse representation + */ + public static SqlDataTypeSpec convertTypeToSpec( + RelDataType type, @Nullable String charSetName, int maxPrecision, int maxScale) { + SqlTypeName typeName = type.getSqlTypeName(); + + // TODO jvs 28-Dec-2004: support row types, user-defined types, + // interval types, multiset types, etc + assert typeName != null; + + final SqlTypeNameSpec typeNameSpec; + if (isAtomic(type) + || isNull(type) + || type.getSqlTypeName() == SqlTypeName.UNKNOWN + || type.getSqlTypeName() == SqlTypeName.GEOMETRY) { + int precision = typeName.allowsPrec() ? type.getPrecision() : -1; + // fix up the precision. + if (maxPrecision > 0 && precision > maxPrecision) { + precision = maxPrecision; + } + int scale = typeName.allowsScale() ? type.getScale() : -1; + if (maxScale > 0 && scale > maxScale) { + scale = maxScale; + } + + typeNameSpec = + new SqlBasicTypeNameSpec( + typeName, precision, scale, charSetName, SqlParserPos.ZERO); + } else if (isCollection(type)) { + typeNameSpec = + new SqlCollectionTypeNameSpec( + convertTypeToSpec(getComponentTypeOrThrow(type)).getTypeNameSpec(), + typeName, + SqlParserPos.ZERO); + } else if (isRow(type)) { + RelRecordType recordType = (RelRecordType) type; + List<RelDataTypeField> fields = recordType.getFieldList(); + List<SqlIdentifier> fieldNames = + fields.stream() + .map(f -> new SqlIdentifier(f.getName(), SqlParserPos.ZERO)) + .collect(Collectors.toList()); + List<SqlDataTypeSpec> fieldTypes = + fields.stream() + .map(f -> convertTypeToSpec(f.getType())) + .collect(Collectors.toList()); + // FLINK MODIFICATION BEGIN + typeNameSpec = + new ExtendedSqlRowTypeNameSpec( + SqlParserPos.ZERO, fieldNames, fieldTypes, List.of(), true); + // FLINK MODIFICATION END + } else if (isMap(type)) { + final RelDataType keyType = + requireNonNull(type.getKeyType(), () -> "keyType of " + type); + final RelDataType valueType = + requireNonNull(type.getValueType(), () -> "valueType of " + type); + final SqlDataTypeSpec keyTypeSpec = convertTypeToSpec(keyType); + final SqlDataTypeSpec valueTypeSpec = convertTypeToSpec(valueType); + typeNameSpec = new SqlMapTypeNameSpec(keyTypeSpec, valueTypeSpec, SqlParserPos.ZERO); + } else { + throw new UnsupportedOperationException( + "Unsupported type when convertTypeToSpec: " + typeName); + } + + // REVIEW jvs 28-Dec-2004: discriminate between precision/scale + // zero and unspecified? + + // REVIEW angel 11-Jan-2006: + // Use neg numbers to indicate unspecified precision/scale + + // FLINK MODIFICATION BEGIN + return new SqlDataTypeSpec(typeNameSpec, SqlParserPos.ZERO).withNullable(type.isNullable()); + // FLINK MODIFICATION BEGIN + } + + /** + * Converts an instance of RelDataType to an instance of SqlDataTypeSpec. + * + * @param type type descriptor + * @return corresponding parse representation + */ + public static SqlDataTypeSpec convertTypeToSpec(RelDataType type) { + // TODO jvs 28-Dec-2004: collation + String charSetName = inCharFamily(type) ? type.getCharset().name() : null; + return convertTypeToSpec(type, charSetName, -1, -1); + } + + public static RelDataType createMultisetType( + RelDataTypeFactory typeFactory, RelDataType type, boolean nullable) { + RelDataType ret = typeFactory.createMultisetType(type, -1); + return typeFactory.createTypeWithNullability(ret, nullable); + } + + public static RelDataType createArrayType( + RelDataTypeFactory typeFactory, RelDataType type, boolean nullable) { + RelDataType ret = typeFactory.createArrayType(type, -1); + return typeFactory.createTypeWithNullability(ret, nullable); + } + + public static RelDataType createMapType( + RelDataTypeFactory typeFactory, + RelDataType keyType, + RelDataType valueType, + boolean nullable) { + RelDataType ret = typeFactory.createMapType(keyType, valueType); + return typeFactory.createTypeWithNullability(ret, nullable); + } + + /** Creates a MAP type from a record type. The record type must have exactly two fields. */ + public static RelDataType createMapTypeFromRecord( + RelDataTypeFactory typeFactory, RelDataType type) { + Preconditions.checkArgument( + type.getFieldCount() == 2, + "MAP requires exactly two fields, got %s; row type %s", + type.getFieldCount(), + type); + return createMapType( + typeFactory, + type.getFieldList().get(0).getType(), + type.getFieldList().get(1).getType(), + false); + } + + /** Creates a ROW type from a map type. The record type will have two fields. */ + public static RelDataType createRecordTypeFromMap( + RelDataTypeFactory typeFactory, RelDataType type) { + RelDataType keyType = requireNonNull(type.getKeyType(), () -> "keyType of " + type); + RelDataType valueType = requireNonNull(type.getValueType(), () -> "valueType of " + type); + return typeFactory.createStructType( + Arrays.asList(keyType, valueType), Arrays.asList("f0", "f1")); + } + + /** + * Adds collation and charset to a character type, returns other types unchanged. + * + * @param type Type + * @param typeFactory Type factory + * @return Type with added charset and collation, or unchanged type if it is not a char type. + */ + public static RelDataType addCharsetAndCollation( + RelDataType type, RelDataTypeFactory typeFactory) { + if (!inCharFamily(type)) { + return type; + } + Charset charset = type.getCharset(); + if (charset == null) { + charset = typeFactory.getDefaultCharset(); + } + SqlCollation collation = type.getCollation(); + if (collation == null) { + collation = SqlCollation.IMPLICIT; + } + + // todo: should get the implicit collation from repository + // instead of null + type = typeFactory.createTypeWithCharsetAndCollation(type, charset, collation); + SqlValidatorUtil.checkCharsetAndCollateConsistentIfCharType(type); + return type; + } + + /** + * Returns whether two types are equal, ignoring nullability. + * + * <p>They need not come from the same factory. + * + * @param factory Type factory + * @param type1 First type + * @param type2 Second type + * @return whether types are equal, ignoring nullability + */ + public static boolean equalSansNullability( + RelDataTypeFactory factory, RelDataType type1, RelDataType type2) { + if (type1.isNullable() == type2.isNullable()) { + return type1.equals(type2); + } + return type1.equals(factory.createTypeWithNullability(type2, type1.isNullable())); + } + + /** + * This is a poorman's {@link #equalSansNullability(RelDataTypeFactory, RelDataType, + * RelDataType)}. + * + * <p>We assume that "not null" is represented in the type's digest as a trailing "NOT NULL" + * (case sensitive). + * + * <p>If you got a type factory, {@link #equalSansNullability(RelDataTypeFactory, RelDataType, + * RelDataType)} is preferred. + * + * @param type1 First type + * @param type2 Second type + * @return true if the types are equal or the only difference is nullability + */ + public static boolean equalSansNullability(RelDataType type1, RelDataType type2) { + if (type1 == type2) { + return true; + } + String x = type1.getFullTypeString(); + String y = type2.getFullTypeString(); + if (x.length() < y.length()) { + String c = x; + x = y; + y = c; + } + + return (x.length() == y.length() + || x.length() == y.length() + NON_NULLABLE_SUFFIX.length() + && x.endsWith(NON_NULLABLE_SUFFIX)) + && x.startsWith(y); + } + + /** + * Returns whether two collection types are equal, ignoring nullability. + * + * <p>They need not come from the same factory. + * + * @param factory Type factory + * @param type1 First type + * @param type2 Second type + * @return Whether types are equal, ignoring nullability + */ + public static boolean equalAsCollectionSansNullability( + RelDataTypeFactory factory, RelDataType type1, RelDataType type2) { + Preconditions.checkArgument(isCollection(type1), "Input type1 must be collection type"); + Preconditions.checkArgument(isCollection(type2), "Input type2 must be collection type"); + + return (type1 == type2) + || (type1.getSqlTypeName() == type2.getSqlTypeName() + && equalSansNullability( + factory, + getComponentTypeOrThrow(type1), + getComponentTypeOrThrow(type2))); + } + + /** + * Returns whether two map types are equal, ignoring nullability. + * + * <p>They need not come from the same factory. + * + * @param factory Type factory + * @param type1 First type + * @param type2 Second type + * @return Whether types are equal, ignoring nullability + */ + public static boolean equalAsMapSansNullability( + RelDataTypeFactory factory, RelDataType type1, RelDataType type2) { + Preconditions.checkArgument(isMap(type1), "Input type1 must be map type"); + Preconditions.checkArgument(isMap(type2), "Input type2 must be map type"); + + MapSqlType mType1 = (MapSqlType) type1; + MapSqlType mType2 = (MapSqlType) type2; + return (type1 == type2) + || (equalSansNullability(factory, mType1.getKeyType(), mType2.getKeyType()) + && equalSansNullability( + factory, mType1.getValueType(), mType2.getValueType())); + } + + /** + * Returns whether two struct types are equal, ignoring nullability. + * + * <p>They do not need to come from the same factory. + * + * @param factory Type factory + * @param type1 First type + * @param type2 Second type + * @param nameMatcher Name matcher used to compare the field names, if null, the field names are + * also ignored + * @return Whether types are equal, ignoring nullability + */ + public static boolean equalAsStructSansNullability( + RelDataTypeFactory factory, + RelDataType type1, + RelDataType type2, + @Nullable SqlNameMatcher nameMatcher) { + Preconditions.checkArgument(type1.isStruct(), "Input type1 must be struct type"); + Preconditions.checkArgument(type2.isStruct(), "Input type2 must be struct type"); + + if (type1 == type2) { + return true; + } + + if (type1.getFieldCount() != type2.getFieldCount()) { + return false; + } + + for (Pair<RelDataTypeField, RelDataTypeField> pair : + Pair.zip(type1.getFieldList(), type2.getFieldList())) { + if (nameMatcher != null + && !nameMatcher.matches(pair.left.getName(), pair.right.getName())) { + return false; + } + if (!equalSansNullability(factory, pair.left.getType(), pair.right.getType())) { + return false; + } + } + + return true; + } + + /** + * Returns the ordinal of a given field in a record type, or -1 if the field is not found. + * + * <p>The {@code fieldName} is always simple, if the field is nested within a record field, + * returns index of the outer field instead. i.g. for row type (a int, b (b1 bigint, b2 + * varchar(20) not null)), returns 1 for both simple name "b1" and "b2". + * + * @param type Record type + * @param fieldName Name of field + * @return Ordinal of field + */ + public static int findField(RelDataType type, String fieldName) { + List<RelDataTypeField> fields = type.getFieldList(); + for (int i = 0; i < fields.size(); i++) { + RelDataTypeField field = fields.get(i); + if (field.getName().equals(fieldName)) { + return i; + } + final RelDataType fieldType = field.getType(); + if (fieldType.isStruct() && findField(fieldType, fieldName) != -1) { + return i; + } + } + return -1; + } + + /** + * Selects data types of the specified fields from an input row type. This is useful when + * identifying data types of a function that is going to operate on inputs that are specified as + * field ordinals (e.g. aggregate calls). + * + * @param rowType input row type + * @param requiredFields ordinals of the projected fields + * @return list of data types that are requested by requiredFields + */ + public static List<RelDataType> projectTypes( + final RelDataType rowType, final List<? extends Number> requiredFields) { + final List<RelDataTypeField> fields = rowType.getFieldList(); + + return new AbstractList<RelDataType>() { + @Override + public RelDataType get(int index) { + return fields.get(requiredFields.get(index).intValue()).getType(); + } + + @Override + public int size() { + return requiredFields.size(); + } + }; + } + + /** + * Records a struct type with no fields. + * + * @param typeFactory Type factory + * @return Struct type with no fields + */ + public static RelDataType createEmptyStructType(RelDataTypeFactory typeFactory) { + return typeFactory.createStructType(ImmutableList.of(), ImmutableList.of()); + } + + /** + * Returns whether a type is flat. It is not flat if it is a record type that has one or more + * fields that are themselves record types. + */ + public static boolean isFlat(RelDataType type) { + if (type.isStruct()) { + for (RelDataTypeField field : type.getFieldList()) { + if (field.getType().isStruct()) { + return false; + } + } + } + return true; + } + + /** + * Returns whether two types are comparable. They need to be scalar types of the same family, or + * struct types whose fields are pairwise comparable. + * + * @param type1 First type + * @param type2 Second type + * @return Whether types are comparable + */ + public static boolean isComparable(RelDataType type1, RelDataType type2) { + if (type1.isStruct() != type2.isStruct()) { + return false; + } + + if (type1.isStruct()) { + int n = type1.getFieldCount(); + if (n != type2.getFieldCount()) { + return false; + } + for (Pair<RelDataTypeField, RelDataTypeField> pair : + Pair.zip(type1.getFieldList(), type2.getFieldList())) { + if (!isComparable(pair.left.getType(), pair.right.getType())) { + return false; + } + } + return true; + } + + final RelDataTypeFamily family1 = family(type1); + final RelDataTypeFamily family2 = family(type2); + if (family1 == family2) { + return true; + } + + // If one of the arguments is of type 'ANY', return true. + if (family1 == SqlTypeFamily.ANY || family2 == SqlTypeFamily.ANY) { + return true; + } + + // If one of the arguments is of type 'NULL', return true. + if (family1 == SqlTypeFamily.NULL || family2 == SqlTypeFamily.NULL) { + return true; + } + + // We can implicitly convert from character to date + if (family1 == SqlTypeFamily.CHARACTER && canConvertStringInCompare(family2) + || family2 == SqlTypeFamily.CHARACTER && canConvertStringInCompare(family1)) { + return true; + } + + return false; + } + + /** + * Returns the least restrictive type T, such that a value of type T can be compared with values + * of type {@code type1} and {@code type2} using {@code =}. + */ + public static @Nullable RelDataType leastRestrictiveForComparison( + RelDataTypeFactory typeFactory, RelDataType type1, RelDataType type2) { + final RelDataType type = typeFactory.leastRestrictive(ImmutableList.of(type1, type2)); + if (type != null) { + return type; + } + final RelDataTypeFamily family1 = family(type1); + final RelDataTypeFamily family2 = family(type2); + + // If one of the arguments is of type 'ANY', we can compare. + if (family1 == SqlTypeFamily.ANY) { + return type2; + } + if (family2 == SqlTypeFamily.ANY) { + return type1; + } + + // If one of the arguments is of type 'NULL', we can compare. + if (family1 == SqlTypeFamily.NULL) { + return type2; + } + if (family2 == SqlTypeFamily.NULL) { + return type1; + } + + // We can implicitly convert from character to date, numeric, etc. + if (family1 == SqlTypeFamily.CHARACTER && canConvertStringInCompare(family2)) { + return type2; + } + if (family2 == SqlTypeFamily.CHARACTER && canConvertStringInCompare(family1)) { + return type1; + } + + return null; + } + + protected static RelDataTypeFamily family(RelDataType type) { + // REVIEW jvs 2-June-2005: This is needed to keep + // the Saffron type system happy. + RelDataTypeFamily family = null; + if (type.getSqlTypeName() != null) { + family = type.getSqlTypeName().getFamily(); + } + if (family == null) { + family = type.getFamily(); + } + return family; + } + + /** + * Returns whether all types in a collection have the same family, as determined by {@link + * #isSameFamily(RelDataType, RelDataType)}. + * + * @param types Types to check + * @return true if all types are of the same family + */ + public static boolean areSameFamily(Iterable<RelDataType> types) { + final List<RelDataType> typeList = ImmutableList.copyOf(types); + if (Sets.newHashSet(RexUtil.families(typeList)).size() < 2) { + return true; + } + for (Pair<RelDataType, RelDataType> adjacent : Pair.adjacents(typeList)) { + if (!isSameFamily(adjacent.left, adjacent.right)) { + return false; + } + } + return true; + } + + /** + * Returns whether two types are scalar types of the same family, or struct types whose fields + * are pairwise of the same family. + * + * @param type1 First type + * @param type2 Second type + * @return Whether types have the same family + */ + private static boolean isSameFamily(RelDataType type1, RelDataType type2) { + if (type1.isStruct() != type2.isStruct()) { + return false; + } + + if (type1.isStruct()) { + int n = type1.getFieldCount(); + if (n != type2.getFieldCount()) { + return false; + } + for (Pair<RelDataTypeField, RelDataTypeField> pair : + Pair.zip(type1.getFieldList(), type2.getFieldList())) { + if (!isSameFamily(pair.left.getType(), pair.right.getType())) { + return false; + } + } + return true; + } + + final RelDataTypeFamily family1 = family(type1); + final RelDataTypeFamily family2 = family(type2); + return family1 == family2; + } + + /** + * Returns whether a character data type can be implicitly converted to a given family in a + * compare operation. + */ + private static boolean canConvertStringInCompare(RelDataTypeFamily family) { + if (family instanceof SqlTypeFamily) { + SqlTypeFamily sqlTypeFamily = (SqlTypeFamily) family; + switch (sqlTypeFamily) { + case DATE: + case TIME: + case TIMESTAMP: + case INTERVAL_DAY_TIME: + case INTERVAL_YEAR_MONTH: + case NUMERIC: + case APPROXIMATE_NUMERIC: + case EXACT_NUMERIC: + case INTEGER: + case BOOLEAN: + return true; + default: + break; + } + } + return false; + } + + /** + * Checks whether a type represents Unicode character data. + * + * @param type type to test + * @return whether type represents Unicode character data + */ + public static boolean isUnicode(RelDataType type) { + Charset charset = type.getCharset(); + if (charset == null) { + return false; + } + return charset.name().startsWith("UTF"); + } + + /** + * Returns the larger of two precisions, treating {@link RelDataType#PRECISION_NOT_SPECIFIED} as + * infinity. + */ + public static int maxPrecision(int p0, int p1) { + return (p0 == RelDataType.PRECISION_NOT_SPECIFIED + || p0 >= p1 && p1 != RelDataType.PRECISION_NOT_SPECIFIED) + ? p0 + : p1; + } + + /** + * Returns whether a precision is greater or equal than another, treating {@link + * RelDataType#PRECISION_NOT_SPECIFIED} as infinity. + */ + public static int comparePrecision(int p0, int p1) { + if (p0 == p1) { + return 0; + } + if (p0 == RelDataType.PRECISION_NOT_SPECIFIED) { + return 1; + } + if (p1 == RelDataType.PRECISION_NOT_SPECIFIED) { + return -1; + } + return Integer.compare(p0, p1); + } + + /** Returns whether a type is ARRAY. */ + public static boolean isArray(RelDataType type) { + return type.getSqlTypeName() == SqlTypeName.ARRAY; + } + + /** Returns whether a type is ROW. */ + public static boolean isRow(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return type.getSqlTypeName() == SqlTypeName.ROW; + } + + /** Returns whether a type is MAP. */ + public static boolean isMap(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return type.getSqlTypeName() == SqlTypeName.MAP; + } + + /** Returns whether a type is MULTISET. */ + public static boolean isMultiset(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return type.getSqlTypeName() == SqlTypeName.MULTISET; + } + + /** Returns whether a type is ARRAY or MULTISET. */ + public static boolean isCollection(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return type.getSqlTypeName() == SqlTypeName.ARRAY + || type.getSqlTypeName() == SqlTypeName.MULTISET; + } + + /** Returns whether a type is CHARACTER. */ + public static boolean isCharacter(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return SqlTypeFamily.CHARACTER.contains(type); + } + + /** + * Returns whether a type is a CHARACTER or contains a CHARACTER type. + * + * @deprecated Use {@link #hasCharacter(RelDataType)} + */ + @Deprecated // to be removed before 2.0 + public static boolean hasCharactor(RelDataType type) { + return hasCharacter(type); + } + + /** Returns whether a type is a CHARACTER or contains a CHARACTER type. */ + public static boolean hasCharacter(RelDataType type) { + if (isCharacter(type)) { + return true; + } + if (isArray(type)) { + return hasCharacter(getComponentTypeOrThrow(type)); + } + return false; + } + + /** Returns whether a type is STRING. */ + public static boolean isString(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return SqlTypeFamily.STRING.contains(type); + } + + /** Returns whether a type is BOOLEAN. */ + public static boolean isBoolean(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return SqlTypeFamily.BOOLEAN.contains(type); + } + + /** Returns whether a type is BINARY. */ + public static boolean isBinary(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return SqlTypeFamily.BINARY.contains(type); + } + + /** Returns whether a type is atomic (datetime, numeric, string or BOOLEAN). */ + public static boolean isAtomic(RelDataType type) { + SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return false; + } + return SqlTypeUtil.isDatetime(type) + || SqlTypeUtil.isNumeric(type) + || SqlTypeUtil.isString(type) + || SqlTypeUtil.isBoolean(type); + } + + /** Returns a DECIMAL type with the maximum precision for the current type system. */ + public static RelDataType getMaxPrecisionScaleDecimal(RelDataTypeFactory factory) { + int maxPrecision = factory.getTypeSystem().getMaxNumericPrecision(); + int maxScale = factory.getTypeSystem().getMaxNumericScale(); + // scale should not greater than precision. + int scale = Math.min(maxPrecision / 2, maxScale); + return factory.createSqlType(SqlTypeName.DECIMAL, maxPrecision, scale); + } + + /** Keeps only the last N fields and returns the new struct type. */ + public static RelDataType extractLastNFields( + RelDataTypeFactory typeFactory, RelDataType type, int numToKeep) { + assert type.isStruct(); + assert type.getFieldCount() >= numToKeep; + final int fieldsCnt = type.getFieldCount(); + return typeFactory.createStructType( + type.getFieldList().subList(fieldsCnt - numToKeep, fieldsCnt)); + } + + /** + * Returns whether the decimal value is valid for the type. For example, 1111.11 is not valid + * for DECIMAL(3, 1) since it overflows. + * + * @param value Value of literal + * @param toType Type of the literal + * @return whether the value is valid for the type + */ + public static boolean isValidDecimalValue(@Nullable BigDecimal value, RelDataType toType) { + if (value == null) { + return true; + } + switch (toType.getSqlTypeName()) { + case DECIMAL: + final int intDigits = value.precision() - value.scale(); + final int maxIntDigits = toType.getPrecision() - toType.getScale(); + return intDigits <= maxIntDigits; + default: + return true; + } + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java index 3ad86f86bc5..648c951ebef 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkCalciteSqlValidator.java @@ -19,6 +19,7 @@ package org.apache.flink.table.planner.calcite; import org.apache.flink.annotation.Internal; +import org.apache.flink.sql.parser.FlinkSqlParsingValidator; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.TableConfigOptions; @@ -72,7 +73,6 @@ import org.apache.calcite.sql.validate.SelectScope; import org.apache.calcite.sql.validate.SqlQualified; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; -import org.apache.calcite.sql.validate.SqlValidatorImpl; import org.apache.calcite.sql.validate.SqlValidatorNamespace; import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.sql2rel.SqlToRelConverter; @@ -100,7 +100,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** Extends Calcite's {@link SqlValidator} by Flink-specific behavior. */ @Internal -public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { +public final class FlinkCalciteSqlValidator extends FlinkSqlParsingValidator { // Enables CallContext#getOutputDataType() when validating SQL expressions. private SqlNode sqlNodeForExpectedOutputType; @@ -122,7 +122,13 @@ public final class FlinkCalciteSqlValidator extends SqlValidatorImpl { RelOptTable.ToRelContext toRelcontext, RelOptCluster relOptCluster, FrameworkConfig frameworkConfig) { - super(opTab, catalogReader, typeFactory, config); + super( + opTab, + catalogReader, + typeFactory, + config, + ShortcutUtils.unwrapTableConfig(relOptCluster) + .get(TableConfigOptions.LEGACY_NESTED_ROW_NULLABILITY)); this.relOptCluster = relOptCluster; this.toRelContext = toRelcontext; this.frameworkConfig = frameworkConfig; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java index 739579d28e6..814dfa1d2af 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java @@ -92,10 +92,42 @@ public final class FlinkRexBuilder extends RexBuilder { } } + /** + * Adjust the nullability of the nested column based on the nullability of the enclosing type. + * However, if there is former nullability {@code CAST} present then it will be dropped and + * replaced with a new one (if needed). For instance if there is a table + * + * <pre>{@code + * CREATE TABLE MyTable ( + * `field1` ROW<`data` ROW<`nested` ROW<`trId` STRING>>NOT NULL> + * WITH ('connector' = 'datagen') + * }</pre> + * + * <p>and then there is a SQL query + * + * <pre>{@code + * SELECT `field1`.`data`.`nested`.`trId` AS transactionId FROM MyTable + * }</pre> + * + * <p>The {@code SELECT} picks a nested field only. In this case it should go step by step + * checking each level. + * + * <ol> + * <li>Looking at {@code `field1`} type it is nullable, then no changes. + * <li>{@code `field1`.`data`} is {@code NOT NULL}, however keeping in mind that enclosing + * type @{code `field1`} is nullable then need to change nullability with {@code CAST} + * <li>{@code `field1`.`data`.`nested`} is nullable that means that in this case no need for + * extra {@code CAST} inserted in previous step, so it will be dropped. + * <li>{@code `field1`.`data`.`nested`.`trId`} is also nullable, so no changes. + * </ol> + */ private RexNode makeFieldAccess(RexNode expr, RexNode field) { final RexNode fieldWithRemovedCast = removeCastNullableFromFieldAccess(field); - if (field.getType().isNullable() != fieldWithRemovedCast.getType().isNullable() - || expr.getType().isNullable() && !field.getType().isNullable()) { + final boolean nullabilityShouldChange = + field.getType().isNullable() != fieldWithRemovedCast.getType().isNullable() + || expr.getType().isNullable() && !field.getType().isNullable(); + + if (nullabilityShouldChange) { return makeCast( typeFactory.createTypeWithNullability(field.getType(), true), fieldWithRemovedCast, diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java index 224bf9b7cac..c755606665c 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java @@ -102,9 +102,6 @@ class CastFunctionMiscITCase extends BuiltInFunctionTestBase { FIELD("s", STRING()))), "CAST(f0 AS ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>)", Row.of(Row.of("12", true, null), "Hello"), - // the inner NOT NULL is ignored in SQL because the outer ROW is - // nullable and the cast does not allow setting the outer - // nullability but derives it from the source operand DataTypes.of( "ROW<r ROW<s STRING NOT NULL, b BOOLEAN, i INT>, s STRING>")), TestSetSpec.forFunction( diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java index 26aa8a7f047..6a465210929 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlDdlToOperationConverterTest.java @@ -881,13 +881,11 @@ class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBas createTestItem( "MAP<BIGINT, BOOLEAN>", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BOOLEAN())), - // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>. createTestItem( "ROW<f0 INT NOT NULL, f1 BOOLEAN>", DataTypes.ROW( DataTypes.FIELD("f0", DataTypes.INT().notNull()), DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), - // Expect to be ROW<`f0` INT NOT NULL, `f1` BOOLEAN>. createTestItem( "ROW(f0 INT NOT NULL, f1 BOOLEAN)", DataTypes.ROW( @@ -901,7 +899,6 @@ class SqlDdlToOperationConverterTest extends SqlNodeToOperationConversionTestBas DataTypes.ROW(DataTypes.FIELD("f0", DataTypes.INT()))), createTestItem("ROW<>", DataTypes.ROW()), createTestItem("ROW()", DataTypes.ROW()), - // Expect to be ROW<`f0` INT NOT NULL '...', `f1` BOOLEAN '...'>. createTestItem( "ROW<f0 INT NOT NULL 'This is a comment.'," + " f1 BOOLEAN 'This as well.'>", diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml index 0e7049343d1..85b1675e096 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/CalcTest.xml @@ -706,13 +706,13 @@ SELECT my_row = ROW(1, 'str') from src </Resource> <Resource name="ast"> <![CDATA[ -LogicalProject(EXPR$0=[=(CAST($0):RecordType(INTEGER a, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" b), CAST(ROW(1, _UTF-16LE'str')):RecordType(INTEGER a, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" b) NOT NULL)]) +LogicalProject(EXPR$0=[=($0, CAST(ROW(1, _UTF-16LE'str')):RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" b) NOT NULL)]) +- LogicalTableScan(table=[[default_catalog, default_database, src]]) ]]> </Resource> <Resource name="optimized exec plan"> <![CDATA[ -Calc(select=[(CAST(my_row AS RecordType(INTEGER a, VARCHAR(2147483647) b)) = CAST(ROW(1, 'str') AS RecordType(INTEGER a, VARCHAR(2147483647) b))) AS EXPR$0]) +Calc(select=[(my_row = CAST(ROW(1, 'str') AS RecordType:peek_no_expand(INTEGER a, VARCHAR(2147483647) b))) AS EXPR$0]) +- TableSourceScan(table=[[default_catalog, default_database, src]], fields=[my_row]) ]]> </Resource> diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala index 4f556d7d36f..09c8d58e6b1 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/CalcITCase.scala @@ -855,4 +855,59 @@ class CalcITCase extends StreamingTestBase { .toList assertThat(actual).isEqualTo(expected) } + + @Test + def testSelectForArrayWithNestedRows(): Unit = { + val result = tEnv + .executeSql(s""" + |SELECT * FROM + |(VALUES ( + | ARRAY[ + | ROW( + | ROW('Test1', TRUE) + | ), + | ROW( + | ROW('Test2', FALSE) + | ) + | ] + |)) + |""".stripMargin) + .collect() + .toList + + val field = result.head.getField(0) + val arr = field.asInstanceOf[Array[Row]] + assertThat(arr.apply(0).getArity).isEqualTo(1) + assertThat(arr.apply(0).getField(0).asInstanceOf[Row].getArity).isEqualTo(2) + assertThat(arr.apply(0).getField(0)).isEqualTo(row("Test1", true)) + assertThat(arr.apply(1).getField(0)).isEqualTo(row("Test2", false)) + } + + @Test + def testSelectForArrayWithMaps(): Unit = { + val result = tEnv + .executeSql(s""" + |SELECT * FROM + |(VALUES ( + | ARRAY[ + | MAP['nested1', + | ARRAY['Test1', 'True'] + | ], + | MAP['nested2', + | ARRAY['Test2', 'False'] + | ] + | ] + |)) + |""".stripMargin) + .collect() + .toList + + val field = result.head.getField(0) + assertThat(field.isInstanceOf[Array[util.Map[_, _]]]).isTrue + val arr = field.asInstanceOf[Array[util.Map[_, _]]] + assertThat(arr.apply(0).get("nested1").asInstanceOf[Array[String]]) + .isEqualTo(Array("Test1", "True")) + assertThat(arr.apply(1).get("nested2").asInstanceOf[Array[String]]) + .isEqualTo(Array("Test2", "False")) + } }
