Github user Xpray commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4658#discussion_r137746521
  
    --- Diff: 
flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/type/SqlTypeFactoryImpl.java
 ---
    @@ -0,0 +1,575 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.calcite.sql.type;
    +
    +import org.apache.calcite.rel.type.RelDataType;
    +import org.apache.calcite.rel.type.RelDataTypeFactory;
    +import org.apache.calcite.rel.type.RelDataTypeFactoryImpl;
    +import org.apache.calcite.rel.type.RelDataTypeFamily;
    +import org.apache.calcite.rel.type.RelDataTypeSystem;
    +import org.apache.calcite.sql.SqlCollation;
    +import org.apache.calcite.sql.SqlIntervalQualifier;
    +import org.apache.calcite.util.Glossary;
    +import org.apache.calcite.util.Util;
    +import org.apache.flink.table.plan.schema.GenericRelDataType;
    +
    +import java.nio.charset.Charset;
    +import java.util.List;
    +
    +/**
    + * SqlTypeFactoryImpl provides a default implementation of
    + * {@link RelDataTypeFactory} which supports SQL types.
    + */
    +public class SqlTypeFactoryImpl extends RelDataTypeFactoryImpl {
    +   //~ Constructors 
-----------------------------------------------------------
    +
    +   public SqlTypeFactoryImpl(RelDataTypeSystem typeSystem) {
    +           super(typeSystem);
    +   }
    +
    +   //~ Methods 
----------------------------------------------------------------
    +
    +   // implement RelDataTypeFactory
    +   public RelDataType createSqlType(SqlTypeName typeName) {
    +           if (typeName.allowsPrec()) {
    +                   return createSqlType(typeName, 
typeSystem.getDefaultPrecision(typeName));
    +           }
    +           assertBasic(typeName);
    +           RelDataType newType = new BasicSqlType(typeSystem, typeName);
    +           return canonize(newType);
    +   }
    +
    +   // implement RelDataTypeFactory
    +   public RelDataType createSqlType(
    +           SqlTypeName typeName,
    +           int precision) {
    +           final int maxPrecision = typeSystem.getMaxPrecision(typeName);
    +           if (maxPrecision >= 0 && precision > maxPrecision) {
    +                   precision = maxPrecision;
    +           }
    +           if (typeName.allowsScale()) {
    +                   return createSqlType(typeName, precision, 
typeName.getDefaultScale());
    +           }
    +           assertBasic(typeName);
    +           assert (precision >= 0)
    +                   || (precision == RelDataType.PRECISION_NOT_SPECIFIED);
    +           RelDataType newType = new BasicSqlType(typeSystem, typeName, 
precision);
    +           newType = SqlTypeUtil.addCharsetAndCollation(newType, this);
    +           return canonize(newType);
    +   }
    +
    +   // implement RelDataTypeFactory
    +   public RelDataType createSqlType(
    +           SqlTypeName typeName,
    +           int precision,
    +           int scale) {
    +           assertBasic(typeName);
    +           assert (precision >= 0)
    +                   || (precision == RelDataType.PRECISION_NOT_SPECIFIED);
    +           final int maxPrecision = typeSystem.getMaxPrecision(typeName);
    +           if (maxPrecision >= 0 && precision > maxPrecision) {
    +                   precision = maxPrecision;
    +           }
    +           RelDataType newType =
    +                   new BasicSqlType(typeSystem, typeName, precision, 
scale);
    +           newType = SqlTypeUtil.addCharsetAndCollation(newType, this);
    +           return canonize(newType);
    +   }
    +
    +   // implement RelDataTypeFactory
    +   public RelDataType createMultisetType(
    +           RelDataType type,
    +           long maxCardinality) {
    +           assert maxCardinality == -1;
    +           RelDataType newType = new MultisetSqlType(type, false);
    +           return canonize(newType);
    +   }
    +
    +   public RelDataType createArrayType(
    +           RelDataType elementType,
    +           long maxCardinality) {
    +           assert maxCardinality == -1;
    +           ArraySqlType newType = new ArraySqlType(elementType, false);
    +           return canonize(newType);
    +   }
    +
    +   public RelDataType createMapType(
    +           RelDataType keyType,
    +           RelDataType valueType) {
    +           MapSqlType newType = new MapSqlType(keyType, valueType, false);
    +           return canonize(newType);
    +   }
    +
    +   // implement RelDataTypeFactory
    +   public RelDataType createSqlIntervalType(
    +           SqlIntervalQualifier intervalQualifier) {
    +           RelDataType newType =
    +                   new IntervalSqlType(typeSystem, intervalQualifier, 
false);
    +           return canonize(newType);
    +   }
    +
    +   // implement RelDataTypeFactory
    +   public RelDataType createTypeWithCharsetAndCollation(
    +           RelDataType type,
    +           Charset charset,
    +           SqlCollation collation) {
    +           assert SqlTypeUtil.inCharFamily(type) : type;
    +           assert charset != null;
    +           assert collation != null;
    +           RelDataType newType;
    +           if (type instanceof BasicSqlType) {
    +                   BasicSqlType sqlType = (BasicSqlType) type;
    +                   newType = 
sqlType.createWithCharsetAndCollation(charset, collation);
    +           } else if (type instanceof JavaType) {
    +                   JavaType javaType = (JavaType) type;
    +                   newType =
    +                           new JavaType(
    +                                   javaType.getJavaClass(),
    +                                   javaType.isNullable(),
    +                                   charset,
    +                                   collation);
    +           } else {
    +                   throw Util.needToImplement("need to implement " + type);
    +           }
    +           return canonize(newType);
    +   }
    +
    +   // implement RelDataTypeFactory
    +   public RelDataType leastRestrictive(List<RelDataType> types) {
    +           assert types != null;
    +           assert types.size() >= 1;
    +
    +           RelDataType type0 = types.get(0);
    +           if (type0.getSqlTypeName() != null) {
    +                   RelDataType resultType = leastRestrictiveSqlType(types);
    +                   if (resultType != null) {
    +                           return resultType;
    +                   }
    +                   return leastRestrictiveByCast(types);
    +           }
    +
    +           return super.leastRestrictive(types);
    +   }
    +
    +   private RelDataType leastRestrictiveByCast(List<RelDataType> types) {
    +           RelDataType resultType = types.get(0);
    +           boolean anyNullable = resultType.isNullable();
    +           for (int i = 1; i < types.size(); i++) {
    +                   RelDataType type = types.get(i);
    +                   if (type.getSqlTypeName() == SqlTypeName.NULL) {
    +                           anyNullable = true;
    +                           continue;
    +                   }
    +
    +                   if (type.isNullable()) {
    +                           anyNullable = true;
    +                   }
    +
    +                   if (SqlTypeUtil.canCastFrom(type, resultType, false)) {
    +                           resultType = type;
    +                   } else {
    +                           if (!SqlTypeUtil.canCastFrom(resultType, type, 
false)) {
    +                                   return null;
    +                           }
    +                   }
    +           }
    +           if (anyNullable) {
    +                   return createTypeWithNullability(resultType, true);
    +           } else {
    +                   return resultType;
    +           }
    +   }
    +
    +   // implement RelDataTypeFactory
    +   public RelDataType createTypeWithNullability(
    +           final RelDataType type,
    +           final boolean nullable) {
    +           RelDataType newType;
    +           if (type instanceof BasicSqlType) {
    +                   BasicSqlType sqlType = (BasicSqlType) type;
    +                   newType = sqlType.createWithNullability(nullable);
    +           } else if (type instanceof MapSqlType) {
    +                   newType = copyMapType(type, nullable);
    +           } else if (type instanceof ArraySqlType) {
    +                   newType = copyArrayType(type, nullable);
    +           } else if (type instanceof MultisetSqlType) {
    +                   newType = copyMultisetType(type, nullable);
    +           } else if (type instanceof IntervalSqlType) {
    +                   newType = copyIntervalType(type, nullable);
    +           } else if (type instanceof ObjectSqlType) {
    +                   newType = copyObjectType(type, nullable);
    +           } else {
    +                   return super.createTypeWithNullability(type, nullable);
    +           }
    +           return canonize(newType);
    +   }
    +
    +   private void assertBasic(SqlTypeName typeName) {
    +           assert typeName != null;
    +           assert typeName != SqlTypeName.MULTISET
    +                   : "use createMultisetType() instead";
    +           assert !SqlTypeName.INTERVAL_TYPES.contains(typeName)
    +                   : "use createSqlIntervalType() instead";
    +   }
    +
    +   private RelDataType leastRestrictiveSqlType(List<RelDataType> types) {
    +           RelDataType resultType = null;
    +           int nullCount = 0;
    +           int nullableCount = 0;
    +           int javaCount = 0;
    +           int anyCount = 0;
    +
    +           for (RelDataType type : types) {
    +                   final SqlTypeName typeName = type.getSqlTypeName();
    +                   if (typeName == null) {
    +                           return null;
    +                   }
    +                   if (typeName == SqlTypeName.ANY) {
    +                           anyCount++;
    +                   }
    +                   if (type.isNullable()) {
    +                           ++nullableCount;
    +                   }
    +                   if (typeName == SqlTypeName.NULL) {
    +                           ++nullCount;
    +                   }
    +                   if (isJavaType(type)) {
    +                           ++javaCount;
    +                   }
    +           }
    +
    +           //  if any of the inputs are ANY, the output is ANY
    +           if (anyCount > 0) {
    +                   // for flink, it should return GenericRelDataType
    +                   if (types.get(0) instanceof GenericRelDataType &&
    --- End diff --
    
    I modified here to support FLink' ANY type


---

Reply via email to