This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1ee24d6 [FLINK-12834][table-planner-blink] Support CharType and BinaryType 1ee24d6 is described below commit 1ee24d6c626e8d361354721ad3de41d18e33bb70 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Thu Jun 13 13:34:14 2019 +0800 [FLINK-12834][table-planner-blink] Support CharType and BinaryType This closes #8730 --- .../flink/table/expressions/RexNodeConverter.java | 6 +- .../aggfunctions/RankLikeAggFunctionBase.java | 1 + .../table/functions/sql/FlinkSqlOperatorTable.java | 21 +- .../flink/table/calcite/FlinkTypeFactory.scala | 37 +++- .../apache/flink/table/codegen/CodeGenUtils.scala | 22 +- .../flink/table/codegen/ExprCodeGenerator.scala | 5 +- .../apache/flink/table/codegen/GenerateUtils.scala | 10 +- .../table/codegen/agg/batch/AggCodeGenHelper.scala | 2 +- .../table/codegen/calls/FunctionGenerator.scala | 239 +++------------------ .../flink/table/codegen/calls/PrintCallGen.scala | 5 +- .../table/codegen/calls/ScalarOperatorGens.scala | 54 ++--- ...naryStringCallGen.scala => StringCallGen.scala} | 120 ++++++++++- .../table/codegen/sort/SortCodeGenerator.scala | 8 +- .../flink/table/plan/util/AggFunctionFactory.scala | 4 +- .../flink/table/plan/util/AggregateUtil.scala | 2 +- .../flink/table/typeutils/TypeCheckUtils.scala | 100 --------- .../flink/table/typeutils/TypeCoercion.scala | 24 ++- .../flink/table/dataformat/BinaryWriter.java | 2 + .../table/dataformat/DataFormatConverters.java | 2 + .../flink/table/dataformat/TypeGetterSetters.java | 2 + .../dataformat/vector/heap/AbstractHeapVector.java | 2 + .../table/types/ClassLogicalTypeConverter.java | 4 + .../flink/table/types/InternalSerializers.java | 2 + .../apache/flink/table/types/PlannerTypeUtils.java | 15 +- .../table/types/TypeInfoDataTypeConverter.java | 2 + .../flink/table/typeutils/TypeCheckUtils.java | 162 ++++++++++++++ 26 files changed, 436 insertions(+), 417 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java index d7f42e0..6fc1ba8 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java @@ -61,9 +61,9 @@ import java.util.stream.Collectors; import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR; import static org.apache.flink.table.calcite.FlinkTypeFactory.toLogicalType; import static org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType; +import static org.apache.flink.table.typeutils.TypeCheckUtils.isCharacterString; import static org.apache.flink.table.typeutils.TypeCheckUtils.isTemporal; import static org.apache.flink.table.typeutils.TypeCheckUtils.isTimeInterval; -import static org.apache.flink.table.typeutils.TypeCheckUtils.isVarchar; /** * Visit expression to generator {@link RexNode}. @@ -129,12 +129,12 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> { } else if (BuiltInFunctionDefinitions.IS_NULL.equals(def)) { return relBuilder.isNull(child.get(0)); } else if (BuiltInFunctionDefinitions.PLUS.equals(def)) { - if (isVarchar(toLogicalType(child.get(0).getType()))) { + if (isCharacterString(toLogicalType(child.get(0).getType()))) { return relBuilder.call( FlinkSqlOperatorTable.CONCAT, child.get(0), relBuilder.cast(child.get(1), VARCHAR)); - } else if (isVarchar(toLogicalType(child.get(1).getType()))) { + } else if (isCharacterString(toLogicalType(child.get(1).getType()))) { return relBuilder.call( FlinkSqlOperatorTable.CONCAT, relBuilder.cast(child.get(0), VARCHAR), diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java index 053f75e..20541a3 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/aggfunctions/RankLikeAggFunctionBase.java @@ -115,6 +115,7 @@ public abstract class RankLikeAggFunctionBase extends DeclarativeAggregateFuncti return literal(0.0d); case DECIMAL: return literal(java.math.BigDecimal.ZERO); + case CHAR: case VARCHAR: return literal(""); case DATE: diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java index 3a4cf1e..f6c8d6a 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/functions/sql/FlinkSqlOperatorTable.java @@ -433,7 +433,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction MD5 = new SqlFunction( "MD5", SqlKind.OTHER_FUNCTION, - ARG0_VARCHAR_FORCE_NULLABLE, + VARCHAR_2000_NULLABLE, null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING), @@ -443,7 +443,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction SHA1 = new SqlFunction( "SHA1", SqlKind.OTHER_FUNCTION, - ARG0_VARCHAR_FORCE_NULLABLE, + VARCHAR_2000_NULLABLE, null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING), @@ -453,7 +453,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction SHA224 = new SqlFunction( "SHA224", SqlKind.OTHER_FUNCTION, - ARG0_VARCHAR_FORCE_NULLABLE, + VARCHAR_2000_NULLABLE, null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING), @@ -463,7 +463,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction SHA256 = new SqlFunction( "SHA256", SqlKind.OTHER_FUNCTION, - ARG0_VARCHAR_FORCE_NULLABLE, + VARCHAR_2000_NULLABLE, null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING), @@ -473,7 +473,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction SHA384 = new SqlFunction( "SHA384", SqlKind.OTHER_FUNCTION, - ARG0_VARCHAR_FORCE_NULLABLE, + VARCHAR_2000_NULLABLE, null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING), @@ -483,7 +483,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction SHA512 = new SqlFunction( "SHA512", SqlKind.OTHER_FUNCTION, - ARG0_VARCHAR_FORCE_NULLABLE, + VARCHAR_2000_NULLABLE, null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING), @@ -493,7 +493,7 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction SHA2 = new SqlFunction( "SHA2", SqlKind.OTHER_FUNCTION, - ARG0_VARCHAR_FORCE_NULLABLE, + VARCHAR_2000_NULLABLE, null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.INTEGER), @@ -839,11 +839,11 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { SqlTypeFamily.INTEGER)), SqlFunctionCategory.NUMERIC); - public static final SqlFunction LTRIM = new SqlFunction( "LTRIM", SqlKind.OTHER_FUNCTION, - ARG0_VARCHAR_FORCE_NULLABLE, + ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NULLABLE, + SqlTypeTransforms.TO_VARYING), null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING), @@ -853,7 +853,8 @@ public class FlinkSqlOperatorTable extends ReflectiveSqlOperatorTable { public static final SqlFunction RTRIM = new SqlFunction( "RTRIM", SqlKind.OTHER_FUNCTION, - ARG0_VARCHAR_FORCE_NULLABLE, + ReturnTypes.cascade(ReturnTypes.ARG0, SqlTypeTransforms.TO_NULLABLE, + SqlTypeTransforms.TO_VARYING), null, OperandTypes.or( OperandTypes.family(SqlTypeFamily.STRING), diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 9fdefe0..4f6f831 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -68,8 +68,8 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp case LogicalTypeRoot.BIGINT => createSqlType(BIGINT) case LogicalTypeRoot.FLOAT => createSqlType(FLOAT) case LogicalTypeRoot.DOUBLE => createSqlType(DOUBLE) - case LogicalTypeRoot.VARCHAR => - createSqlType(VARCHAR, t.asInstanceOf[VarCharType].getLength) + case LogicalTypeRoot.VARCHAR => createSqlType(VARCHAR, t.asInstanceOf[VarCharType].getLength) + case LogicalTypeRoot.CHAR => createSqlType(CHAR, t.asInstanceOf[CharType].getLength) // temporal types case LogicalTypeRoot.DATE => createSqlType(DATE) @@ -83,6 +83,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp createSqlIntervalType( new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)) + case LogicalTypeRoot.BINARY => createSqlType(BINARY, t.asInstanceOf[BinaryType].getLength) case LogicalTypeRoot.VARBINARY => createSqlType(VARBINARY, t.asInstanceOf[VarBinaryType].getLength) @@ -379,14 +380,30 @@ object FlinkTypeFactory { case BIGINT => new BigIntType() case FLOAT => new FloatType() case DOUBLE => new DoubleType() - case VARCHAR | CHAR => - // TODO we use VarCharType to support sql CHAR, VarCharType don't support 0 length - new VarCharType( - if (relDataType.getPrecision == 0) VarCharType.MAX_LENGTH else relDataType.getPrecision) - case VARBINARY | BINARY => - // TODO we use VarBinaryType to support sql BINARY, VarBinaryType don't support 0 length - new VarBinaryType( - if (relDataType.getPrecision == 0) VarBinaryType.MAX_LENGTH else relDataType.getPrecision) + case CHAR => + if (relDataType.getPrecision == 0) { + CharType.ofEmptyLiteral + } else { + new CharType(relDataType.getPrecision) + } + case VARCHAR => + if (relDataType.getPrecision == 0) { + VarCharType.ofEmptyLiteral + } else { + new VarCharType(relDataType.getPrecision) + } + case BINARY => + if (relDataType.getPrecision == 0) { + BinaryType.ofEmptyLiteral + } else { + new BinaryType(relDataType.getPrecision) + } + case VARBINARY => + if (relDataType.getPrecision == 0) { + VarBinaryType.ofEmptyLiteral + } else { + new VarBinaryType(relDataType.getPrecision) + } case DECIMAL => new DecimalType(relDataType.getPrecision, relDataType.getScale) // time indicators diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala index ddac75a..b9ab71c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala @@ -146,8 +146,8 @@ object CodeGenUtils { case INTERVAL_YEAR_MONTH => className[JInt] case INTERVAL_DAY_TIME => className[JLong] - case VARCHAR => BINARY_STRING - case VARBINARY => "byte[]" + case VARCHAR | CHAR => BINARY_STRING + case VARBINARY | BINARY => "byte[]" case DECIMAL => className[Decimal] case ARRAY => className[BinaryArray] @@ -178,7 +178,7 @@ object CodeGenUtils { case FLOAT => "-1.0f" case DOUBLE => "-1.0d" case BOOLEAN => "false" - case VARCHAR => s"$BINARY_STRING.EMPTY_UTF8" + case VARCHAR | CHAR => s"$BINARY_STRING.EMPTY_UTF8" case DATE | TIME_WITHOUT_TIME_ZONE => "-1" case TIMESTAMP_WITHOUT_TIME_ZONE => "-1L" @@ -205,8 +205,8 @@ object CodeGenUtils { case BIGINT => s"${className[JLong]}.hashCode($term)" case FLOAT => s"${className[JFloat]}.hashCode($term)" case DOUBLE => s"${className[JDouble]}.hashCode($term)" - case VARCHAR => s"$term.hashCode()" - case VARBINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" + + case VARCHAR | CHAR => s"$term.hashCode()" + case VARBINARY | BINARY => s"${className[MurmurHashUtil]}.hashUnsafeBytes(" + s"$term, $BYTE_ARRAY_BASE_OFFSET, $term.length)" case DECIMAL => s"$term.hashCode()" case DATE => s"${className[JInt]}.hashCode($term)" @@ -330,8 +330,8 @@ object CodeGenUtils { throw new CodeGenException(s"Comparable type expected, but was '${genExpr.resultType}'.") } - def requireString(genExpr: GeneratedExpression): Unit = - if (!TypeCheckUtils.isVarchar(genExpr.resultType)) { + def requireCharacterString(genExpr: GeneratedExpression): Unit = + if (!TypeCheckUtils.isCharacterString(genExpr.resultType)) { throw new CodeGenException("String expression type expected.") } @@ -392,8 +392,8 @@ object CodeGenUtils { case BIGINT => s"$rowTerm.getLong($indexTerm)" case FLOAT => s"$rowTerm.getFloat($indexTerm)" case DOUBLE => s"$rowTerm.getDouble($indexTerm)" - case VARCHAR => s"$rowTerm.getString($indexTerm)" - case VARBINARY => s"$rowTerm.getBinary($indexTerm)" + case VARCHAR | CHAR => s"$rowTerm.getString($indexTerm)" + case VARBINARY | BINARY => s"$rowTerm.getBinary($indexTerm)" case DECIMAL => val dt = t.asInstanceOf[DecimalType] s"$rowTerm.getDecimal($indexTerm, ${dt.getPrecision}, ${dt.getScale})" @@ -616,8 +616,8 @@ object CodeGenUtils { case FLOAT => s"$writerTerm.writeFloat($indexTerm, $fieldValTerm)" case DOUBLE => s"$writerTerm.writeDouble($indexTerm, $fieldValTerm)" case BOOLEAN => s"$writerTerm.writeBoolean($indexTerm, $fieldValTerm)" - case VARBINARY => s"$writerTerm.writeBinary($indexTerm, $fieldValTerm)" - case VARCHAR => s"$writerTerm.writeString($indexTerm, $fieldValTerm)" + case VARBINARY | BINARY => s"$writerTerm.writeBinary($indexTerm, $fieldValTerm)" + case VARCHAR | CHAR => s"$writerTerm.writeString($indexTerm, $fieldValTerm)" case DECIMAL => val dt = t.asInstanceOf[DecimalType] s"$writerTerm.writeDecimal($indexTerm, $fieldValTerm, ${dt.getPrecision})" diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ExprCodeGenerator.scala index 16d0f54..d4ff105 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ExprCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ExprCodeGenerator.scala @@ -27,13 +27,12 @@ import org.apache.flink.table.calcite.{FlinkTypeFactory, RexAggLocalVariable, Re import org.apache.flink.table.codegen.CodeGenUtils.{requireTemporal, requireTimeInterval, _} import org.apache.flink.table.codegen.GenerateUtils._ import org.apache.flink.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE} -import org.apache.flink.table.codegen.calls.{BinaryStringCallGen, FunctionGenerator, ScalarFunctionCallGen, TableFunctionCallGen} +import org.apache.flink.table.codegen.calls.{StringCallGen, FunctionGenerator, ScalarFunctionCallGen, TableFunctionCallGen} import org.apache.flink.table.codegen.calls.ScalarOperatorGens._ import org.apache.flink.table.dataformat._ import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable._ import org.apache.flink.table.functions.utils.{ScalarSqlFunction, TableSqlFunction} import org.apache.flink.table.types.PlannerTypeUtils.isInteroperable -import org.apache.flink.table.types.logical.LogicalTypeRoot.{VARBINARY, VARCHAR} import org.apache.flink.table.types.logical._ import org.apache.flink.table.typeutils.TypeCheckUtils.{isNumeric, isTemporal, isTimeInterval} import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils} @@ -733,7 +732,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean) // advanced scalar functions case sqlOperator: SqlOperator => - BinaryStringCallGen.generateCallExpression(ctx, operator, operands, resultType).getOrElse { + StringCallGen.generateCallExpression(ctx, operator, operands, resultType).getOrElse { FunctionGenerator .getCallGenerator( sqlOperator, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala index 6a77070d..4b39c05 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/GenerateUtils.scala @@ -29,7 +29,7 @@ import org.apache.flink.table.plan.util.SortUtil import org.apache.flink.table.types.PlannerTypeUtils import org.apache.flink.table.types.logical.LogicalTypeRoot._ import org.apache.flink.table.types.logical._ -import org.apache.flink.table.typeutils.TypeCheckUtils.{isReference, isTemporal} +import org.apache.flink.table.typeutils.TypeCheckUtils.{isCharacterString, isReference, isTemporal} import org.apache.calcite.avatica.util.ByteString import org.apache.commons.lang3.StringEscapeUtils @@ -173,7 +173,7 @@ object GenerateUtils { // TODO: should we also consider other types? val parameters = operands.map(x => - if (x.resultType.isInstanceOf[VarCharType]){ + if (isCharacterString(x.resultType)){ "( " + x.nullTerm + " ) ? null : (" + x.resultTerm + ")" } else { x.resultTerm @@ -351,12 +351,12 @@ object GenerateUtils { literalValue.asInstanceOf[JBigDecimal], precision, scale) generateNonNullLiteral(literalType, fieldTerm, value) - case VARCHAR => + case VARCHAR | CHAR => val escapedValue = StringEscapeUtils.ESCAPE_JAVA.translate(literalValue.toString) val field = ctx.addReusableStringConstants(escapedValue) generateNonNullLiteral(literalType, field, BinaryString.fromString(escapedValue)) - case VARBINARY => + case VARBINARY | BINARY => val bytesVal = literalValue.asInstanceOf[ByteString].getBytes val fieldTerm = ctx.addReusableObject( bytesVal, "binary", bytesVal.getClass.getCanonicalName) @@ -658,7 +658,7 @@ object GenerateUtils { s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)" case _ if PlannerTypeUtils.isPrimitive(t) => s"($leftTerm > $rightTerm ? 1 : $leftTerm < $rightTerm ? -1 : 0)" - case VARBINARY => + case VARBINARY | BINARY => val sortUtil = classOf[org.apache.flink.table.runtime.sort.SortUtil].getCanonicalName s"$sortUtil.compareBinary($leftTerm, $rightTerm)" case ARRAY => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala index a58a1fe..58c5654 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/agg/batch/AggCodeGenHelper.scala @@ -396,7 +396,7 @@ object AggCodeGenHelper { aggBufferExprs.zip(initAggBufferExprs).map { case (aggBufVar, initExpr) => val resultCode = aggBufVar.resultType.getTypeRoot match { - case VARCHAR | ROW | ARRAY | MULTISET | MAP => + case VARCHAR | CHAR | ROW | ARRAY | MULTISET | MAP => val serializer = InternalSerializers.create( aggBufVar.resultType, new ExecutionConfig) val term = ctx.addReusableObject( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala index 9cc1e72..f06cbf1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala @@ -357,10 +357,8 @@ object FunctionGenerator { Seq(BIGINT), BuiltInMethods.HEX_LONG) - addSqlFunctionMethod( - HEX, - Seq(VARCHAR), - BuiltInMethods.HEX_STRING) + addSqlFunctionMethod(HEX, Seq(VARCHAR), BuiltInMethods.HEX_STRING) + addSqlFunctionMethod(HEX, Seq(CHAR), BuiltInMethods.HEX_STRING) // ---------------------------------------------------------------------------------------------- // Temporal functions @@ -606,73 +604,6 @@ object FunctionGenerator { Seq(BIGINT, BIGINT), BuiltInMethods.BITXOR_LONG) - addSqlFunction( - PRINT, - Seq(VARCHAR, VARCHAR), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, BOOLEAN), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, INTEGER), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, SMALLINT), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, INTEGER), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, BIGINT), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, FLOAT), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, DOUBLE), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, DATE), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, TIMESTAMP_WITHOUT_TIME_ZONE), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, TIME_WITHOUT_TIME_ZONE), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq(VARCHAR, DECIMAL), - new PrintCallGen()) - - addSqlFunction( - PRINT, - Seq( - VARCHAR, - VARBINARY), - new PrintCallGen()) - addSqlFunctionMethod( NOW, Seq(), @@ -699,16 +630,6 @@ object FunctionGenerator { TIMESTAMP_WITHOUT_TIME_ZONE), BuiltInMethods.DATEDIFF_T_T) - addSqlFunction( - IF, - Seq(BOOLEAN, VARCHAR, VARCHAR), - new IfCallGen()) - - addSqlFunction( - IF, - Seq(BOOLEAN, BOOLEAN, BOOLEAN), - new IfCallGen()) - // This sequence must be in sync with [[NumericOrDefaultReturnTypeInference]] val numericTypes = Seq( INTEGER, @@ -751,6 +672,14 @@ object FunctionGenerator { VARBINARY), new IfCallGen()) + addSqlFunction( + IF, + Seq( + BOOLEAN, + BINARY, + BINARY), + new IfCallGen()) + addSqlFunctionMethod( DIV_INT, Seq(INTEGER, INTEGER), @@ -837,6 +766,11 @@ object FunctionGenerator { addSqlFunction( HASH_CODE, + Seq(BINARY), + new HashCodeCallGen()) + + addSqlFunction( + HASH_CODE, Seq(DECIMAL), new HashCodeCallGen()) @@ -864,37 +798,6 @@ object FunctionGenerator { Seq(TIMESTAMP_WITHOUT_TIME_ZONE), BuiltInMethods.TIMESTAMP_TO_BIGINT) - // Date/Time & BinaryString Converting -- start - addSqlFunctionMethod( - TO_DATE, - Seq(VARCHAR), - BuiltInMethod.STRING_TO_DATE.method) - - addSqlFunctionMethod( - TO_DATE, - Seq(VARCHAR, VARCHAR), - BuiltInMethods.STRING_TO_DATE_WITH_FORMAT) - - addSqlFunctionMethod( - TO_TIMESTAMP, - Seq(VARCHAR), - BuiltInMethods.STRING_TO_TIMESTAMP) - - addSqlFunctionMethod( - TO_TIMESTAMP, - Seq(VARCHAR, VARCHAR), - BuiltInMethods.STRING_TO_TIMESTAMP_WITH_FORMAT) - - addSqlFunctionMethod( - UNIX_TIMESTAMP, - Seq(VARCHAR), - BuiltInMethods.UNIX_TIMESTAMP_STR) - - addSqlFunctionMethod( - UNIX_TIMESTAMP, - Seq(VARCHAR, VARCHAR), - BuiltInMethods.UNIX_TIMESTAMP_FORMAT) - INTEGRAL_TYPES foreach ( dt => addSqlFunctionMethod( FROM_UNIXTIME, @@ -912,108 +815,20 @@ object FunctionGenerator { Seq(DECIMAL), BuiltInMethods.FROM_UNIXTIME_AS_DECIMAL) - addSqlFunctionMethod( - FROM_UNIXTIME, - Seq(BIGINT, VARCHAR), - BuiltInMethods.FROM_UNIXTIME_FORMAT) - - addSqlFunctionMethod( - DATEDIFF, - Seq(TIMESTAMP_WITHOUT_TIME_ZONE, VARCHAR), - BuiltInMethods.DATEDIFF_T_S) - - addSqlFunctionMethod( - DATEDIFF, - Seq(VARCHAR, TIMESTAMP_WITHOUT_TIME_ZONE), - BuiltInMethods.DATEDIFF_S_T) - - addSqlFunctionMethod( - DATEDIFF, - Seq(VARCHAR, VARCHAR), - BuiltInMethods.DATEDIFF_S_S) - - addSqlFunctionMethod( - DATE_FORMAT, - Seq(TIMESTAMP_WITHOUT_TIME_ZONE, VARCHAR), - BuiltInMethods.DATE_FORMAT_LONG_STRING) - - addSqlFunctionMethod( - DATE_FORMAT, - Seq(VARCHAR, VARCHAR), - BuiltInMethods.DATE_FORMAT_STIRNG_STRING) - - addSqlFunctionMethod( - DATE_FORMAT, - Seq( - VARCHAR, - VARCHAR, - VARCHAR), - BuiltInMethods.DATE_FORMAT_STRING_STRING_STRING) - - addSqlFunctionMethod( - DATE_SUB, - Seq(VARCHAR, INTEGER), - BuiltInMethods.DATE_SUB_S) - - addSqlFunctionMethod( - DATE_SUB, - Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER), - BuiltInMethods.DATE_SUB_T) - - addSqlFunctionMethod( - DATE_ADD, - Seq(VARCHAR, INTEGER), - BuiltInMethods.DATE_ADD_S) - - addSqlFunctionMethod( - DATE_ADD, - Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER), - BuiltInMethods.DATE_ADD_T) - - addSqlFunctionMethod( - TO_TIMESTAMP_TZ, - Seq( - VARCHAR, - VARCHAR), - BuiltInMethods.STRING_TO_TIMESTAMP_TZ) + addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, VARCHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT) + addSqlFunctionMethod(FROM_UNIXTIME, Seq(BIGINT, CHAR), BuiltInMethods.FROM_UNIXTIME_FORMAT) - addSqlFunctionMethod( - TO_TIMESTAMP_TZ, - Seq( - VARCHAR, - VARCHAR, - VARCHAR), - BuiltInMethods.STRING_TO_TIMESTAMP_FORMAT_TZ) + addSqlFunctionMethod(DATE_SUB, Seq(VARCHAR, INTEGER), BuiltInMethods.DATE_SUB_S) + addSqlFunctionMethod(DATE_SUB, Seq(CHAR, INTEGER), BuiltInMethods.DATE_SUB_S) addSqlFunctionMethod( - DATE_FORMAT_TZ, - Seq(TIMESTAMP_WITHOUT_TIME_ZONE, VARCHAR), - BuiltInMethods.DATE_FORMAT_LONG_ZONE) + DATE_SUB, Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER), BuiltInMethods.DATE_SUB_T) - addSqlFunctionMethod( - DATE_FORMAT_TZ, - Seq( - TIMESTAMP_WITHOUT_TIME_ZONE, - VARCHAR, - VARCHAR), - BuiltInMethods.DATE_FORMAT_LONG_STRING_ZONE) + addSqlFunctionMethod(DATE_ADD, Seq(VARCHAR, INTEGER), BuiltInMethods.DATE_ADD_S) + addSqlFunctionMethod(DATE_ADD, Seq(CHAR, INTEGER), BuiltInMethods.DATE_ADD_S) addSqlFunctionMethod( - CONVERT_TZ, - Seq( - VARCHAR, - VARCHAR, - VARCHAR), - BuiltInMethods.CONVERT_TZ) - - addSqlFunctionMethod( - CONVERT_TZ, - Seq( - VARCHAR, - VARCHAR, - VARCHAR, - VARCHAR), - BuiltInMethods.CONVERT_FORMAT_TZ) + DATE_ADD, Seq(TIMESTAMP_WITHOUT_TIME_ZONE, INTEGER), BuiltInMethods.DATE_ADD_T) // ---------------------------------------------------------------------------------------------- @@ -1071,18 +886,14 @@ object FunctionGenerator { private def addSqlFunctionMethod( sqlOperator: SqlOperator, operandTypes: Seq[LogicalTypeRoot], - method: Method) - : Unit = { + method: Method): Unit = { sqlFunctions((sqlOperator, operandTypes)) = new MethodCallGen(method) } private def addSqlFunction( sqlOperator: SqlOperator, operandTypes: Seq[LogicalTypeRoot], - callGenerator: CallGenerator) - : Unit = { + callGenerator: CallGenerator): Unit = { sqlFunctions((sqlOperator, operandTypes)) = callGenerator } - - } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/PrintCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/PrintCallGen.scala index 843b976..c6491a8 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/PrintCallGen.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/PrintCallGen.scala @@ -20,7 +20,8 @@ package org.apache.flink.table.codegen.calls import org.apache.flink.table.codegen.CodeGenUtils.{newNames, primitiveTypeTermForType} import org.apache.flink.table.codegen.{CodeGeneratorContext, GeneratedExpression} -import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot} +import org.apache.flink.table.types.logical.LogicalType +import org.apache.flink.table.typeutils.TypeCheckUtils.isBinaryString /** * Generates PRINT function call. @@ -39,7 +40,7 @@ class PrintCallGen extends CallGenerator { val logTerm = "logger$" ctx.addReusableLogger(logTerm, "_Print$_") - val outputCode = if (returnType.getTypeRoot == LogicalTypeRoot.VARBINARY) { + val outputCode = if (isBinaryString(returnType)) { s"new String($resultTerm, java.nio.charset.Charset.defaultCharset())" } else { s"String.valueOf(${operands(1).resultTerm})" diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala index 14fae9b..52532fd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperatorGens.scala @@ -325,7 +325,7 @@ object ScalarOperatorGens { right: GeneratedExpression) : GeneratedExpression = { val canEqual = isInteroperable(left.resultType, right.resultType) - if (left.resultType.getTypeRoot == VARCHAR && right.resultType.getTypeRoot == VARCHAR) { + if (isCharacterString(left.resultType) && isCharacterString(right.resultType)) { generateOperatorIfNotNull(ctx, new BooleanType(), left, right) { (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)" } @@ -348,14 +348,14 @@ object ScalarOperatorGens { } // support date/time/timestamp equalTo string. // for performance, we cast literal string to literal time. - else if (isTimePoint(left.resultType) && right.resultType.getTypeRoot == VARCHAR) { + else if (isTimePoint(left.resultType) && isCharacterString(right.resultType)) { if (right.literal) { generateEquals(ctx, left, generateCastStringLiteralToDateTime(ctx, right, left.resultType)) } else { generateEquals(ctx, left, generateCast(ctx, right, left.resultType)) } } - else if (isTimePoint(right.resultType) && left.resultType.getTypeRoot == VARCHAR) { + else if (isTimePoint(right.resultType) && isCharacterString(left.resultType)) { if (left.literal) { generateEquals( ctx, @@ -368,10 +368,10 @@ object ScalarOperatorGens { // non comparable types else { generateOperatorIfNotNull(ctx, new BooleanType(), left, right) { - if (isReference(left)) { + if (isReference(left.resultType)) { (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)" } - else if (isReference(right)) { + else if (isReference(right.resultType)) { (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)" } else { @@ -387,7 +387,7 @@ object ScalarOperatorGens { left: GeneratedExpression, right: GeneratedExpression) : GeneratedExpression = { - if (left.resultType.isInstanceOf[VarCharType] && right.resultType.isInstanceOf[VarCharType]) { + if (isCharacterString(left.resultType) && isCharacterString(right.resultType)) { generateOperatorIfNotNull(ctx, new BooleanType(), left, right) { (leftTerm, rightTerm) => s"!$leftTerm.equals($rightTerm)" } @@ -421,10 +421,10 @@ object ScalarOperatorGens { // non-comparable types else { generateOperatorIfNotNull(ctx, new BooleanType(), left, right) { - if (isReference(left)) { + if (isReference(left.resultType)) { (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))" } - else if (isReference(right)) { + else if (isReference(right.resultType)) { (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))" } else { @@ -472,7 +472,7 @@ object ScalarOperatorGens { } } // both sides are binary type - else if (isBinary(left.resultType) && + else if (isBinaryString(left.resultType) && isInteroperable(left.resultType, right.resultType)) { (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)" @@ -497,7 +497,7 @@ object ScalarOperatorGens { if (ctx.nullCheck) { GeneratedExpression(operand.nullTerm, NEVER_NULL, operand.code, new BooleanType()) } - else if (!ctx.nullCheck && isReference(operand)) { + else if (!ctx.nullCheck && isReference(operand.resultType)) { val resultTerm = newName("isNull") val operatorCode = s""" @@ -523,7 +523,7 @@ object ScalarOperatorGens { |""".stripMargin.trim GeneratedExpression(resultTerm, NEVER_NULL, operatorCode, new BooleanType()) } - else if (!ctx.nullCheck && isReference(operand)) { + else if (!ctx.nullCheck && isReference(operand.resultType)) { val resultTerm = newName("result") val operatorCode = s""" @@ -772,7 +772,7 @@ object ScalarOperatorGens { operand.copy(resultType = targetType) // Date/Time/Timestamp -> String - case (_, VARCHAR) if TypeCheckUtils.isTimePoint(operand.resultType) => + case (_, VARCHAR | CHAR) if TypeCheckUtils.isTimePoint(operand.resultType) => generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) { operandTerm => val zoneTerm = ctx.addReusableTimeZone() @@ -780,7 +780,7 @@ object ScalarOperatorGens { } // Interval Months -> String - case (INTERVAL_YEAR_MONTH, VARCHAR) => + case (INTERVAL_YEAR_MONTH, VARCHAR | CHAR) => val method = qualifyMethod(BuiltInMethod.INTERVAL_YEAR_MONTH_TO_STRING.method) val timeUnitRange = qualifyEnum(TimeUnitRange.YEAR_TO_MONTH) generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) { @@ -788,7 +788,7 @@ object ScalarOperatorGens { } // Interval Millis -> String - case (INTERVAL_DAY_TIME, VARCHAR) => + case (INTERVAL_DAY_TIME, VARCHAR | CHAR) => val method = qualifyMethod(BuiltInMethod.INTERVAL_DAY_TIME_TO_STRING.method) val timeUnitRange = qualifyEnum(TimeUnitRange.DAY_TO_SECOND) generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) { @@ -796,11 +796,11 @@ object ScalarOperatorGens { } // Array -> String - case (ARRAY, VARCHAR) => + case (ARRAY, VARCHAR | CHAR) => generateCastArrayToString(ctx, operand, operand.resultType.asInstanceOf[ArrayType]) // Byte array -> String UTF-8 - case (VARBINARY, VARCHAR) => + case (VARBINARY, VARCHAR | CHAR) => val charset = classOf[StandardCharsets].getCanonicalName generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) { terms => s"(new String(${terms.head}, $charset.UTF_8))" @@ -808,14 +808,14 @@ object ScalarOperatorGens { // Map -> String - case (MAP, VARCHAR) => + case (MAP, VARCHAR | CHAR) => generateCastMapToString(ctx, operand, operand.resultType.asInstanceOf[MapType]) // composite type -> String - case (ROW, VARCHAR) => + case (ROW, VARCHAR | CHAR) => generateCastBaseRowToString(ctx, operand, operand.resultType.asInstanceOf[RowType]) - case (ANY, VARCHAR) => + case (ANY, VARCHAR | CHAR) => generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) { terms => val converter = DataFormatConverters.getConverterForDataType( @@ -826,13 +826,13 @@ object ScalarOperatorGens { // * (not Date/Time/Timestamp) -> String // TODO: GenericType with Date/Time/Timestamp -> String would call toString implicitly - case (_, VARCHAR) => + case (_, VARCHAR | CHAR) => generateStringResultCallIfArgsNotNull(ctx, Seq(operand)) { terms => s""" "" + ${terms.head}""" } // String -> Boolean - case (VARCHAR, BOOLEAN) => + case (VARCHAR | CHAR, BOOLEAN) => generateUnaryOperatorIfNotNull( ctx, targetType, @@ -842,7 +842,7 @@ object ScalarOperatorGens { } // String -> NUMERIC TYPE (not Character) - case (VARCHAR, _) + case (VARCHAR | CHAR, _) if TypeCheckUtils.isNumeric(targetType) => targetType match { case dt: DecimalType => @@ -870,7 +870,7 @@ object ScalarOperatorGens { } // String -> Date - case (VARCHAR, DATE) => + case (VARCHAR | CHAR, DATE) => generateUnaryOperatorIfNotNull( ctx, targetType, @@ -881,7 +881,7 @@ object ScalarOperatorGens { } // String -> Time - case (VARCHAR, TIME_WITHOUT_TIME_ZONE) => + case (VARCHAR | CHAR, TIME_WITHOUT_TIME_ZONE) => generateUnaryOperatorIfNotNull( ctx, targetType, @@ -892,7 +892,7 @@ object ScalarOperatorGens { } // String -> Timestamp - case (VARCHAR, TIMESTAMP_WITHOUT_TIME_ZONE) => + case (VARCHAR | CHAR, TIMESTAMP_WITHOUT_TIME_ZONE) => generateUnaryOperatorIfNotNull( ctx, targetType, @@ -905,7 +905,7 @@ object ScalarOperatorGens { } // String -> binary - case (VARCHAR, VARBINARY) => + case (VARCHAR | CHAR, VARBINARY | BINARY) => generateUnaryOperatorIfNotNull(ctx, targetType, operand) { operandTerm => s"$operandTerm.getBytes()" } @@ -1147,7 +1147,7 @@ object ScalarOperatorGens { } checkArgument(operands(1).literal) - checkArgument(operands(1).resultType.isInstanceOf[VarCharType]) + checkArgument(isCharacterString(operands(1).resultType)) checkArgument(operands.head.resultType.isInstanceOf[RowType]) val fieldName = operands(1).literalValue.get.toString diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BinaryStringCallGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/StringCallGen.scala similarity index 83% rename from flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BinaryStringCallGen.scala rename to flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/StringCallGen.scala index 6761f3a..0d73ff1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/BinaryStringCallGen.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/calls/StringCallGen.scala @@ -18,8 +18,6 @@ package org.apache.flink.table.codegen.calls -import org.apache.flink.api.common.typeinfo.Types -import org.apache.flink.api.java.typeutils.MapTypeInfo import org.apache.flink.table.api.DataTypes import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.GenerateUtils.{generateCallIfArgsNotNull, generateCallIfArgsNullable, generateStringResultCallIfArgsNotNull} @@ -29,10 +27,14 @@ import org.apache.flink.table.dataformat.DataFormatConverters import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable._ import org.apache.flink.table.runtime.functions.SqlFunctionUtils import org.apache.flink.table.types.logical.{BooleanType, IntType, LogicalType, MapType, VarBinaryType, VarCharType} +import org.apache.flink.table.typeutils.TypeCheckUtils.{isCharacterString, isTimestamp} import org.apache.calcite.runtime.SqlFunctions import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING} +import org.apache.calcite.util.BuiltInMethod + +import java.lang.reflect.Method /** * Code generator for call with string parameters or return value. @@ -42,13 +44,18 @@ import org.apache.calcite.sql.fun.SqlTrimFunction.Flag.{BOTH, LEADING, TRAILING} * <p>TODO Need to rewrite most of the methods here, calculated directly on the BinaryString * instead of convert BinaryString to String. */ -object BinaryStringCallGen { +object StringCallGen { def generateCallExpression( ctx: CodeGeneratorContext, operator: SqlOperator, operands: Seq[GeneratedExpression], returnType: LogicalType): Option[GeneratedExpression] = { + + def methodGen(method: Method): GeneratedExpression = { + new MethodCallGen(method).generate(ctx, operands, returnType) + } + val generator = operator match { case LIKE => @@ -105,7 +112,7 @@ object BinaryStringCallGen { case KEYVALUE => generateKeyValue(ctx, operands) - case HASH_CODE if operands.head.resultType.isInstanceOf[VarCharType] => + case HASH_CODE if isCharacterString(operands.head.resultType) => generateHashCode(ctx, operands) case MD5 => generateMd5(ctx, operands) @@ -137,11 +144,11 @@ object BinaryStringCallGen { case BIN => generateBin(ctx, operands) case CONCAT_FUNCTION => - operands.foreach(requireString) + operands.foreach(requireCharacterString) generateConcat(ctx, operands) case CONCAT_WS => - operands.foreach(requireString) + operands.foreach(requireCharacterString) generateConcatWs(ctx, operands) case STR_TO_MAP => generateStrToMap(ctx, operands) @@ -155,7 +162,7 @@ object BinaryStringCallGen { case CONCAT => val left = operands.head val right = operands(1) - requireString(left) + requireCharacterString(left) generateArithmeticConcat(ctx, left, right) case UUID => generateUuid(ctx, operands) @@ -168,6 +175,101 @@ object BinaryStringCallGen { case INSTR => generateInstr(ctx, operands) + case PRINT => new PrintCallGen().generate(ctx, operands, returnType) + + case IF => + requireBoolean(operands.head) + new IfCallGen().generate(ctx, operands, returnType) + + // Date/Time & BinaryString Converting -- start + + case TO_DATE if operands.size == 1 && isCharacterString(operands.head.resultType) => + methodGen(BuiltInMethod.STRING_TO_DATE.method) + + case TO_DATE if operands.size == 2 && + isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) => + methodGen(BuiltInMethods.STRING_TO_DATE_WITH_FORMAT) + + case TO_TIMESTAMP if operands.size == 1 && isCharacterString(operands.head.resultType) => + methodGen(BuiltInMethods.STRING_TO_TIMESTAMP) + + case TO_TIMESTAMP if operands.size == 2 && + isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) => + methodGen(BuiltInMethods.STRING_TO_TIMESTAMP_WITH_FORMAT) + + case UNIX_TIMESTAMP if operands.size == 1 && isCharacterString(operands.head.resultType) => + methodGen(BuiltInMethods.UNIX_TIMESTAMP_STR) + + case UNIX_TIMESTAMP if operands.size == 2 && + isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) => + methodGen(BuiltInMethods.UNIX_TIMESTAMP_FORMAT) + + case DATEDIFF if isTimestamp(operands.head.resultType) && + isCharacterString(operands(1).resultType) => + methodGen(BuiltInMethods.DATEDIFF_T_S) + + case DATEDIFF if isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) => + methodGen(BuiltInMethods.DATEDIFF_S_S) + + case DATEDIFF if isCharacterString(operands.head.resultType) && + isTimestamp(operands(1).resultType) => + methodGen(BuiltInMethods.DATEDIFF_S_T) + + case DATE_FORMAT if operands.size == 2 && + isTimestamp(operands.head.resultType) && + isCharacterString(operands(1).resultType) => + methodGen(BuiltInMethods.DATE_FORMAT_LONG_STRING) + + case DATE_FORMAT if operands.size == 2 && + isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) => + methodGen(BuiltInMethods.DATE_FORMAT_STIRNG_STRING) + + case DATE_FORMAT if operands.size == 3 && + isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) && + isCharacterString(operands(2).resultType) => + methodGen(BuiltInMethods.DATE_FORMAT_STRING_STRING_STRING) + + case TO_TIMESTAMP_TZ if operands.size == 2 && + isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) => + methodGen(BuiltInMethods.STRING_TO_TIMESTAMP_TZ) + + case TO_TIMESTAMP_TZ if operands.size == 3 && + isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) && + isCharacterString(operands(2).resultType) => + methodGen(BuiltInMethods.STRING_TO_TIMESTAMP_FORMAT_TZ) + + case DATE_FORMAT_TZ if operands.size == 2 && + isTimestamp(operands.head.resultType) && + isCharacterString(operands(1).resultType) => + methodGen(BuiltInMethods.DATE_FORMAT_LONG_ZONE) + + case DATE_FORMAT_TZ if operands.size == 3 && + isTimestamp(operands.head.resultType) && + isCharacterString(operands(1).resultType) && + isCharacterString(operands(2).resultType) => + methodGen(BuiltInMethods.DATE_FORMAT_LONG_STRING_ZONE) + + case CONVERT_TZ if operands.size == 3 && + isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) && + isCharacterString(operands(2).resultType) => + methodGen(BuiltInMethods.CONVERT_TZ) + + case CONVERT_TZ if operands.size == 4 && + isCharacterString(operands.head.resultType) && + isCharacterString(operands(1).resultType) && + isCharacterString(operands(2).resultType) && + isCharacterString(operands(3).resultType) => + methodGen(BuiltInMethods.CONVERT_FORMAT_TZ) + case _ => null } @@ -176,7 +278,7 @@ object BinaryStringCallGen { private def toStringTerms(terms: Seq[String], operands: Seq[GeneratedExpression]) = { terms.zipWithIndex.map { case (term, index) => - if (operands(index).resultType.isInstanceOf[VarCharType]) { + if (isCharacterString(operands(index).resultType)) { s"$term.toString()" } else { term @@ -186,7 +288,7 @@ object BinaryStringCallGen { private def safeToStringTerms(terms: Seq[String], operands: Seq[GeneratedExpression]) = { terms.zipWithIndex.map { case (term, index) => - if (operands(index).resultType.isInstanceOf[VarCharType]) { + if (isCharacterString(operands(index).resultType)) { s"$BINARY_STRING.safeToString($term)" } else { term diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala index e456171..99c52f1 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/sort/SortCodeGenerator.scala @@ -410,8 +410,8 @@ class SortCodeGenerator( case FLOAT => "Float" case DOUBLE => "Double" case BOOLEAN => "Boolean" - case VARCHAR => "String" - case VARBINARY => "Binary" + case VARCHAR | CHAR => "String" + case VARBINARY | BINARY => "Binary" case DECIMAL => "Decimal" case DATE => "Int" case TIME_WITHOUT_TIME_ZONE => "Int" @@ -436,7 +436,7 @@ class SortCodeGenerator( def supportNormalizedKey(t: LogicalType): Boolean = { t.getTypeRoot match { case _ if PlannerTypeUtils.isPrimitive(t) => true - case VARCHAR | VARBINARY | + case VARCHAR | CHAR | VARBINARY | BINARY | DATE | TIME_WITHOUT_TIME_ZONE | TIMESTAMP_WITHOUT_TIME_ZONE => true case DECIMAL => Decimal.isCompact(t.asInstanceOf[DecimalType].getPrecision) case _ => false @@ -458,7 +458,7 @@ class SortCodeGenerator( case DATE => 4 case TIME_WITHOUT_TIME_ZONE => 4 case DECIMAL if Decimal.isCompact(t.asInstanceOf[DecimalType].getPrecision) => 8 - case VARCHAR | VARBINARY => Int.MaxValue + case VARCHAR | CHAR | VARBINARY | BINARY => Int.MaxValue } } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggFunctionFactory.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggFunctionFactory.scala index 84b84c2..611d59c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggFunctionFactory.scala @@ -284,7 +284,7 @@ class AggFunctionFactory( new DoubleMinWithRetractAggFunction case BOOLEAN => new BooleanMinWithRetractAggFunction - case VARCHAR => + case VARCHAR | CHAR => new StringMinWithRetractAggFunction case DECIMAL => val d = argTypes(0).asInstanceOf[DecimalType] @@ -315,7 +315,7 @@ class AggFunctionFactory( new MinAggFunction.DoubleMinAggFunction case BOOLEAN => new MinAggFunction.BooleanMinAggFunction - case VARCHAR => + case VARCHAR | CHAR => new MinAggFunction.StringMinAggFunction case DATE => new MinAggFunction.DateMinAggFunction diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala index a80c0a7..c325eba 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/AggregateUtil.scala @@ -501,7 +501,7 @@ object AggregateUtil extends Enumeration { case INTERVAL_YEAR_MONTH => DataTypes.INT case INTERVAL_DAY_TIME => DataTypes.BIGINT - case VARCHAR => fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE) + case VARCHAR | CHAR => fromLegacyInfoToDataType(BinaryStringTypeInfo.INSTANCE) case DECIMAL => val dt = argTypes(0).asInstanceOf[DecimalType] DataTypes.DECIMAL(dt.getPrecision, dt.getScale) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala deleted file mode 100644 index e5ff5bb..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.typeutils - -import org.apache.flink.table.codegen.GeneratedExpression -import org.apache.flink.table.types.logical.LogicalTypeRoot._ -import org.apache.flink.table.types.logical._ - -object TypeCheckUtils { - - def isNumeric(dataType: LogicalType): Boolean = - dataType.getTypeRoot.getFamilies.contains(LogicalTypeFamily.NUMERIC) - - def isTemporal(dataType: LogicalType): Boolean = - isTimePoint(dataType) || isTimeInterval(dataType) - - def isTimePoint(dataType: LogicalType): Boolean = dataType.getTypeRoot match { - case TIME_WITHOUT_TIME_ZONE | DATE | TIMESTAMP_WITHOUT_TIME_ZONE => true - case _ => false - } - - def isRowTime(dataType: LogicalType): Boolean = - dataType match { - case t: TimestampType => t.getKind == TimestampKind.ROWTIME - case _ => false - } - - def isProcTime(dataType: LogicalType): Boolean = - dataType match { - case t: TimestampType => t.getKind == TimestampKind.PROCTIME - case _ => false - } - - def isTimeInterval(dataType: LogicalType): Boolean = dataType.getTypeRoot match { - case INTERVAL_YEAR_MONTH | INTERVAL_DAY_TIME => true - case _ => false - } - - def isVarchar(dataType: LogicalType): Boolean = dataType.getTypeRoot == VARCHAR - - def isBinary(dataType: LogicalType): Boolean = dataType.getTypeRoot == BINARY - - def isBoolean(dataType: LogicalType): Boolean = dataType.getTypeRoot == BOOLEAN - - def isDecimal(dataType: LogicalType): Boolean = dataType.getTypeRoot == DECIMAL - - def isInteger(dataType: LogicalType): Boolean = dataType.getTypeRoot == INTEGER - - def isLong(dataType: LogicalType): Boolean = dataType.getTypeRoot == BIGINT - - def isArray(dataType: LogicalType): Boolean = dataType.getTypeRoot == ARRAY - - def isMap(dataType: LogicalType): Boolean = dataType.getTypeRoot == MAP - - def isAny(dataType: LogicalType): Boolean = dataType.getTypeRoot == ANY - - def isRow(dataType: LogicalType): Boolean = dataType.getTypeRoot == ROW - - def isComparable(dataType: LogicalType): Boolean = - !isAny(dataType) && !isMap(dataType) && !isRow(dataType) && !isArray(dataType) - - def isMutable(dataType: LogicalType): Boolean = dataType.getTypeRoot match { - // the internal representation of String is BinaryString which is mutable - case VARCHAR => true - case ARRAY | MULTISET | MAP | ROW | ANY => true - case _ => false - } - - def isReference(t: LogicalType): Boolean = t.getTypeRoot match { - case INTEGER | - TINYINT | - SMALLINT | - BIGINT | - FLOAT | - DOUBLE | - BOOLEAN | - DATE | - TIME_WITHOUT_TIME_ZONE | - TIMESTAMP_WITHOUT_TIME_ZONE => false - case _ => true - } - - def isReference(genExpr: GeneratedExpression): Boolean = isReference(genExpr.resultType) -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala index 3348191..c9e7faa 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/typeutils/TypeCoercion.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.typeutils.TypeCheckUtils._ */ object TypeCoercion { - val numericWideningPrecedence: IndexedSeq[LogicalType] = + var numericWideningPrecedence: IndexedSeq[LogicalType] = IndexedSeq( new TinyIntType(), new SmallIntType(), @@ -36,12 +36,14 @@ object TypeCoercion { new FloatType(), new DoubleType()) + numericWideningPrecedence ++= numericWideningPrecedence.map(_.copy(false)) + def widerTypeOf(tp1: LogicalType, tp2: LogicalType): Option[LogicalType] = { (tp1.getTypeRoot, tp2.getTypeRoot) match { case (_, _) if tp1 == tp2 => Some(tp1) - case (_, VARCHAR) => Some(tp2) - case (VARCHAR, _) => Some(tp1) + case (_, VARCHAR | CHAR) => Some(tp2) + case (VARCHAR | CHAR, _) => Some(tp1) case (_, DECIMAL) => Some(tp2) case (DECIMAL, _) => Some(tp1) @@ -63,7 +65,7 @@ object TypeCoercion { */ def canSafelyCast( from: LogicalType, to: LogicalType): Boolean = (from.getTypeRoot, to.getTypeRoot) match { - case (_, VARCHAR) => true + case (_, VARCHAR | CHAR) => true case (_, DECIMAL) if isNumeric(from) => true @@ -89,14 +91,14 @@ object TypeCoercion { from: LogicalType, to: LogicalType): Boolean = (from.getTypeRoot, to.getTypeRoot) match { case (_, _) if from == to => true - case (_, VARCHAR) => true + case (_, VARCHAR | CHAR) => true - case (VARCHAR, _) if isNumeric(to) => true - case (VARCHAR, BOOLEAN) => true - case (VARCHAR, DECIMAL) => true - case (VARCHAR, DATE) => true - case (VARCHAR, TIME_WITHOUT_TIME_ZONE) => true - case (VARCHAR, TIMESTAMP_WITHOUT_TIME_ZONE) => true + case (VARCHAR | CHAR, _) if isNumeric(to) => true + case (VARCHAR | CHAR, BOOLEAN) => true + case (VARCHAR | CHAR, DECIMAL) => true + case (VARCHAR | CHAR, DATE) => true + case (VARCHAR | CHAR, TIME_WITHOUT_TIME_ZONE) => true + case (VARCHAR | CHAR, TIMESTAMP_WITHOUT_TIME_ZONE) => true case (BOOLEAN, _) if isNumeric(to) => true case (BOOLEAN, DECIMAL) => true diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java index 728c79a..16fef15 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/BinaryWriter.java @@ -100,6 +100,7 @@ public interface BinaryWriter { case DOUBLE: writer.writeDouble(pos, (double) o); break; + case CHAR: case VARCHAR: writer.writeString(pos, (BinaryString) o); break; @@ -121,6 +122,7 @@ public interface BinaryWriter { case ANY: writer.writeGeneric(pos, (BinaryGeneric) o); break; + case BINARY: case VARBINARY: writer.writeBinary(pos, (byte[]) o); break; diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java index d94bdf5..35f9b70 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/DataFormatConverters.java @@ -136,6 +136,7 @@ public class DataFormatConverters { Class<?> clazz = dataType.getConversionClass(); LogicalType logicalType = dataType.getLogicalType(); switch (logicalType.getTypeRoot()) { + case CHAR: case VARCHAR: if (clazz == String.class) { return StringConverter.INSTANCE; @@ -144,6 +145,7 @@ public class DataFormatConverters { } else { throw new RuntimeException("Not support class for VARCHAR: " + clazz); } + case BINARY: case VARBINARY: return PrimitiveByteArrayConverter.INSTANCE; case DECIMAL: diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java index a94111c..92217ed 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/TypeGetterSetters.java @@ -181,6 +181,7 @@ public interface TypeGetterSetters { return row.getFloat(ordinal); case DOUBLE: return row.getDouble(ordinal); + case CHAR: case VARCHAR: return row.getString(ordinal); case DECIMAL: @@ -193,6 +194,7 @@ public interface TypeGetterSetters { return row.getMap(ordinal); case ROW: return row.getRow(ordinal, ((RowType) type).getFieldCount()); + case BINARY: case VARBINARY: return row.getBinary(ordinal); case ANY: diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java index 2af5be5..b857a34 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/vector/heap/AbstractHeapVector.java @@ -122,7 +122,9 @@ public abstract class AbstractHeapVector extends AbstractColumnVector { } case SMALLINT: return new HeapShortVector(maxRows); + case CHAR: case VARCHAR: + case BINARY: case VARBINARY: return new HeapBytesVector(maxRows); default: diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java index 9b8cf36..9050435 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/ClassLogicalTypeConverter.java @@ -79,6 +79,7 @@ public class ClassLogicalTypeConverter { return Float.class; case DOUBLE: return Double.class; + case CHAR: case VARCHAR: return String.class; case DECIMAL: @@ -116,6 +117,7 @@ public class ClassLogicalTypeConverter { return Map.class; case ROW: return Row.class; + case BINARY: case VARBINARY: return byte[].class; case ANY: @@ -152,6 +154,7 @@ public class ClassLogicalTypeConverter { return Float.class; case DOUBLE: return Double.class; + case CHAR: case VARCHAR: return BinaryString.class; case DECIMAL: @@ -163,6 +166,7 @@ public class ClassLogicalTypeConverter { return BinaryMap.class; case ROW: return BaseRow.class; + case BINARY: case VARBINARY: return byte[].class; case ANY: diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java index 16840b3..c871038 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/InternalSerializers.java @@ -65,6 +65,7 @@ public class InternalSerializers { return FloatSerializer.INSTANCE; case DOUBLE: return DoubleSerializer.INSTANCE; + case CHAR: case VARCHAR: return BinaryStringSerializer.INSTANCE; case DECIMAL: @@ -78,6 +79,7 @@ public class InternalSerializers { case ROW: RowType rowType = (RowType) type; return new BaseRowSerializer(config, rowType); + case BINARY: case VARBINARY: return BytePrimitiveArraySerializer.INSTANCE; case ANY: diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java index 92f1d42..08622aa 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/PlannerTypeUtils.java @@ -24,6 +24,9 @@ import org.apache.flink.table.types.logical.RowType; import java.util.List; +import static org.apache.flink.table.types.logical.LogicalTypeFamily.BINARY_STRING; +import static org.apache.flink.table.types.logical.LogicalTypeFamily.CHARACTER_STRING; + /** * Utilities for {@link LogicalType} and {@link DataType}.. */ @@ -74,15 +77,19 @@ public class PlannerTypeUtils { * 2.Join keys. */ public static boolean isInteroperable(LogicalType t1, LogicalType t2) { + if (t1.getTypeRoot().getFamilies().contains(CHARACTER_STRING) && + t2.getTypeRoot().getFamilies().contains(CHARACTER_STRING)) { + return true; + } + if (t1.getTypeRoot().getFamilies().contains(BINARY_STRING) && + t2.getTypeRoot().getFamilies().contains(BINARY_STRING)) { + return true; + } if (t1.getTypeRoot() != t2.getTypeRoot()) { return false; } switch (t1.getTypeRoot()) { - // VARCHAR VARBINARY ignore length. - case VARCHAR: - case VARBINARY: - return true; case ARRAY: case MAP: case MULTISET: diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java index 5d984d2..d233083 100644 --- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/types/TypeInfoDataTypeConverter.java @@ -70,10 +70,12 @@ public class TypeInfoDataTypeConverter { return clazz == Decimal.class ? new DecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale()) : new BigDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale()); + case CHAR: case VARCHAR: // ignore precision return clazz == BinaryString.class ? BinaryStringTypeInfo.INSTANCE : BasicTypeInfo.STRING_TYPE_INFO; + case BINARY: case VARBINARY: // ignore precision return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO; case ARRAY: diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/TypeCheckUtils.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/TypeCheckUtils.java new file mode 100644 index 0000000..3c93838 --- /dev/null +++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/TypeCheckUtils.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.typeutils; + +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeFamily; +import org.apache.flink.table.types.logical.TimestampKind; +import org.apache.flink.table.types.logical.TimestampType; + +import static org.apache.flink.table.types.logical.LogicalTypeRoot.ANY; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.ARRAY; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.BOOLEAN; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.MAP; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.ROW; +import static org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE; + +/** + * Utils for type. + */ +public class TypeCheckUtils { + + public static boolean isNumeric(LogicalType type) { + return type.getTypeRoot().getFamilies().contains(LogicalTypeFamily.NUMERIC); + } + + public static boolean isTemporal(LogicalType type) { + return isTimePoint(type) || isTimeInterval(type); + } + + public static boolean isTimePoint(LogicalType type) { + switch (type.getTypeRoot()) { + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + return true; + default: + return false; + } + } + + public static boolean isRowTime(LogicalType type) { + return type instanceof TimestampType && ((TimestampType) type).getKind() == TimestampKind.ROWTIME; + } + + public static boolean isProcTime(LogicalType type) { + return type instanceof TimestampType && ((TimestampType) type).getKind() == TimestampKind.PROCTIME; + } + + public static boolean isTimeInterval(LogicalType type) { + switch (type.getTypeRoot()) { + case INTERVAL_DAY_TIME: + case INTERVAL_YEAR_MONTH: + return true; + default: + return false; + } + } + + public static boolean isCharacterString(LogicalType type) { + return type.getTypeRoot().getFamilies().contains(LogicalTypeFamily.CHARACTER_STRING); + } + + public static boolean isBinaryString(LogicalType type) { + return type.getTypeRoot().getFamilies().contains(LogicalTypeFamily.BINARY_STRING); + } + + public static boolean isTimestamp(LogicalType type) { + return type.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE; + } + + public static boolean isBoolean(LogicalType type) { + return type.getTypeRoot() == BOOLEAN; + } + + public static boolean isDecimal(LogicalType type) { + return type.getTypeRoot() == DECIMAL; + } + + public static boolean isInteger(LogicalType type) { + return type.getTypeRoot() == INTEGER; + } + + public static boolean isLong(LogicalType type) { + return type.getTypeRoot() == BIGINT; + } + + public static boolean isArray(LogicalType type) { + return type.getTypeRoot() == ARRAY; + } + + public static boolean isMap(LogicalType type) { + return type.getTypeRoot() == MAP; + } + + public static boolean isAny(LogicalType type) { + return type.getTypeRoot() == ANY; + } + + public static boolean isRow(LogicalType type) { + return type.getTypeRoot() == ROW; + } + + public static boolean isComparable(LogicalType type) { + return !isAny(type) && !isMap(type) && !isRow(type) && !isArray(type); + } + + public static boolean isMutable(LogicalType type) { + // the internal representation of String is BinaryString which is mutable + switch (type.getTypeRoot()) { + case VARCHAR: + case CHAR: + case ARRAY: + case MULTISET: + case MAP: + case ROW: + case ANY: + return true; + default: + return false; + } + } + + public static boolean isReference(LogicalType type) { + switch (type.getTypeRoot()) { + case BOOLEAN: + case TINYINT: + case SMALLINT: + case INTEGER: + case BIGINT: + case FLOAT: + case DOUBLE: + case DATE: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return false; + default: + return true; + } + } +}