Github user Xpray commented on a diff in the pull request: https://github.com/apache/flink/pull/4658#discussion_r137746521 --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java --- @@ -0,0 +1,575 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.sql.type; + +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl; +import org.apache.calcite.rel.type.RelDataTypeFamily; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.SqlCollation; +import org.apache.calcite.sql.SqlIntervalQualifier; +import org.apache.calcite.util.Glossary; +import org.apache.calcite.util.Util; +import org.apache.flink.table.plan.schema.GenericRelDataType; + +import java.nio.charset.Charset; +import java.util.List; + +/** + * SqlTypeFactoryImpl provides a default implementation of + * {@link RelDataTypeFactory} which supports SQL types. + */ +public class SqlTypeFactoryImpl extends RelDataTypeFactoryImpl { + //~ Constructors ----------------------------------------------------------- + + public SqlTypeFactoryImpl(RelDataTypeSystem typeSystem) { + super(typeSystem); + } + + //~ Methods ---------------------------------------------------------------- + + // implement RelDataTypeFactory + public RelDataType createSqlType(SqlTypeName typeName) { + if (typeName.allowsPrec()) { + return createSqlType(typeName, typeSystem.getDefaultPrecision(typeName)); + } + assertBasic(typeName); + RelDataType newType = new BasicSqlType(typeSystem, typeName); + return canonize(newType); + } + + // implement RelDataTypeFactory + public RelDataType createSqlType( + SqlTypeName typeName, + int precision) { + final int maxPrecision = typeSystem.getMaxPrecision(typeName); + if (maxPrecision >= 0 && precision > maxPrecision) { + precision = maxPrecision; + } + if (typeName.allowsScale()) { + return createSqlType(typeName, precision, typeName.getDefaultScale()); + } + assertBasic(typeName); + assert (precision >= 0) + || (precision == RelDataType.PRECISION_NOT_SPECIFIED); + RelDataType newType = new BasicSqlType(typeSystem, typeName, precision); + newType = SqlTypeUtil.addCharsetAndCollation(newType, this); + return canonize(newType); + } + + // implement RelDataTypeFactory + public RelDataType createSqlType( + SqlTypeName typeName, + int precision, + int scale) { + assertBasic(typeName); + assert (precision >= 0) + || (precision == RelDataType.PRECISION_NOT_SPECIFIED); + final int maxPrecision = typeSystem.getMaxPrecision(typeName); + if (maxPrecision >= 0 && precision > maxPrecision) { + precision = maxPrecision; + } + RelDataType newType = + new BasicSqlType(typeSystem, typeName, precision, scale); + newType = SqlTypeUtil.addCharsetAndCollation(newType, this); + return canonize(newType); + } + + // implement RelDataTypeFactory + public RelDataType createMultisetType( + RelDataType type, + long maxCardinality) { + assert maxCardinality == -1; + RelDataType newType = new MultisetSqlType(type, false); + return canonize(newType); + } + + public RelDataType createArrayType( + RelDataType elementType, + long maxCardinality) { + assert maxCardinality == -1; + ArraySqlType newType = new ArraySqlType(elementType, false); + return canonize(newType); + } + + public RelDataType createMapType( + RelDataType keyType, + RelDataType valueType) { + MapSqlType newType = new MapSqlType(keyType, valueType, false); + return canonize(newType); + } + + // implement RelDataTypeFactory + public RelDataType createSqlIntervalType( + SqlIntervalQualifier intervalQualifier) { + RelDataType newType = + new IntervalSqlType(typeSystem, intervalQualifier, false); + return canonize(newType); + } + + // implement RelDataTypeFactory + public RelDataType createTypeWithCharsetAndCollation( + RelDataType type, + Charset charset, + SqlCollation collation) { + assert SqlTypeUtil.inCharFamily(type) : type; + assert charset != null; + assert collation != null; + RelDataType newType; + if (type instanceof BasicSqlType) { + BasicSqlType sqlType = (BasicSqlType) type; + newType = sqlType.createWithCharsetAndCollation(charset, collation); + } else if (type instanceof JavaType) { + JavaType javaType = (JavaType) type; + newType = + new JavaType( + javaType.getJavaClass(), + javaType.isNullable(), + charset, + collation); + } else { + throw Util.needToImplement("need to implement " + type); + } + return canonize(newType); + } + + // implement RelDataTypeFactory + public RelDataType leastRestrictive(List<RelDataType> types) { + assert types != null; + assert types.size() >= 1; + + RelDataType type0 = types.get(0); + if (type0.getSqlTypeName() != null) { + RelDataType resultType = leastRestrictiveSqlType(types); + if (resultType != null) { + return resultType; + } + return leastRestrictiveByCast(types); + } + + return super.leastRestrictive(types); + } + + private RelDataType leastRestrictiveByCast(List<RelDataType> types) { + RelDataType resultType = types.get(0); + boolean anyNullable = resultType.isNullable(); + for (int i = 1; i < types.size(); i++) { + RelDataType type = types.get(i); + if (type.getSqlTypeName() == SqlTypeName.NULL) { + anyNullable = true; + continue; + } + + if (type.isNullable()) { + anyNullable = true; + } + + if (SqlTypeUtil.canCastFrom(type, resultType, false)) { + resultType = type; + } else { + if (!SqlTypeUtil.canCastFrom(resultType, type, false)) { + return null; + } + } + } + if (anyNullable) { + return createTypeWithNullability(resultType, true); + } else { + return resultType; + } + } + + // implement RelDataTypeFactory + public RelDataType createTypeWithNullability( + final RelDataType type, + final boolean nullable) { + RelDataType newType; + if (type instanceof BasicSqlType) { + BasicSqlType sqlType = (BasicSqlType) type; + newType = sqlType.createWithNullability(nullable); + } else if (type instanceof MapSqlType) { + newType = copyMapType(type, nullable); + } else if (type instanceof ArraySqlType) { + newType = copyArrayType(type, nullable); + } else if (type instanceof MultisetSqlType) { + newType = copyMultisetType(type, nullable); + } else if (type instanceof IntervalSqlType) { + newType = copyIntervalType(type, nullable); + } else if (type instanceof ObjectSqlType) { + newType = copyObjectType(type, nullable); + } else { + return super.createTypeWithNullability(type, nullable); + } + return canonize(newType); + } + + private void assertBasic(SqlTypeName typeName) { + assert typeName != null; + assert typeName != SqlTypeName.MULTISET + : "use createMultisetType() instead"; + assert !SqlTypeName.INTERVAL_TYPES.contains(typeName) + : "use createSqlIntervalType() instead"; + } + + private RelDataType leastRestrictiveSqlType(List<RelDataType> types) { + RelDataType resultType = null; + int nullCount = 0; + int nullableCount = 0; + int javaCount = 0; + int anyCount = 0; + + for (RelDataType type : types) { + final SqlTypeName typeName = type.getSqlTypeName(); + if (typeName == null) { + return null; + } + if (typeName == SqlTypeName.ANY) { + anyCount++; + } + if (type.isNullable()) { + ++nullableCount; + } + if (typeName == SqlTypeName.NULL) { + ++nullCount; + } + if (isJavaType(type)) { + ++javaCount; + } + } + + // if any of the inputs are ANY, the output is ANY + if (anyCount > 0) { + // for flink, it should return GenericRelDataType + if (types.get(0) instanceof GenericRelDataType && --- End diff -- I modified here to support FLink' ANY type
---