This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit beddbd4677306a8c975042d36f9c5597c55ca9ae Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Thu Jul 25 16:29:29 2019 +0800 [FLINK-13237][table-planner-blink] Add LocalTime and LocalDateTime support to planner expressions --- .../expressions/PlannerExpressionConverter.scala | 3 +-- .../table/planner/expressions/comparison.scala | 10 ++++++++++ .../flink/table/planner/expressions/time.scala | 21 ++++++++++++++++----- .../planner/typeutils/TypeInfoCheckUtils.scala | 7 ++++--- 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala index 9b9fd36..90a8282 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala @@ -28,7 +28,6 @@ import org.apache.flink.table.planner.functions.InternalFunctionDefinitions.THRO import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo import org.apache.flink.table.types.logical.LogicalTypeRoot.{CHAR, DECIMAL, SYMBOL, TIMESTAMP_WITHOUT_TIME_ZONE} import org.apache.flink.table.types.logical.utils.LogicalTypeChecks._ -import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo import _root_.scala.collection.JavaConverters._ @@ -91,7 +90,7 @@ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExp assert(children.size == 2) return ThrowException( children.head.accept(this), - fromDataTypeToLegacyInfo( + fromDataTypeToTypeInfo( children(1).asInstanceOf[TypeLiteralExpression].getOutputDataType)) case _ => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala index b0684df..b8769f2 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/comparison.scala @@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable import org.apache.flink.table.planner.typeutils.TypeInfoCheckUtils.{isArray, isComparable, isNumeric} import org.apache.flink.table.planner.validate._ +import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType import org.apache.calcite.sql.SqlOperator @@ -34,6 +35,9 @@ abstract class BinaryComparison extends BinaryExpression { (left.resultType, right.resultType) match { case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess case (lType, rType) if isComparable(lType) && lType == rType => ValidationSuccess + case (lType, rType) if isComparable(lType) && + fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) => + ValidationSuccess case (lType, rType) => ValidationFailure( s"Comparison is only supported for numeric types and " + @@ -50,6 +54,9 @@ case class EqualTo(left: PlannerExpression, right: PlannerExpression) extends Bi (left.resultType, right.resultType) match { case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess case (lType, rType) if lType == rType => ValidationSuccess + case (lType, rType) + if fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) => + ValidationSuccess case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass => ValidationSuccess case (lType, rType) => @@ -66,6 +73,9 @@ case class NotEqualTo(left: PlannerExpression, right: PlannerExpression) extends (left.resultType, right.resultType) match { case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess case (lType, rType) if lType == rType => ValidationSuccess + case (lType, rType) + if fromTypeInfoToLogicalType(lType) == fromTypeInfoToLogicalType(rType) => + ValidationSuccess case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass => ValidationSuccess case (lType, rType) => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala index 2429881..8817f5c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/time.scala @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.expressions import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.{LocalTimeTypeInfo, SqlTimeTypeInfo, TypeInformation} import org.apache.flink.table.planner.calcite.FlinkRelBuilder import org.apache.flink.table.planner.expressions.PlannerTimeIntervalUnit.PlannerTimeIntervalUnit import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable @@ -51,6 +51,8 @@ case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpress | SymbolPlannerExpression(PlannerTimeIntervalUnit.DAY) if temporal.resultType == SqlTimeTypeInfo.DATE || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP + || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE + || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MONTHS => ValidationSuccess @@ -60,6 +62,8 @@ case class Extract(timeIntervalUnit: PlannerExpression, temporal: PlannerExpress | SymbolPlannerExpression(PlannerTimeIntervalUnit.SECOND) if temporal.resultType == SqlTimeTypeInfo.TIME || temporal.resultType == SqlTimeTypeInfo.TIMESTAMP + || temporal.resultType == LocalTimeTypeInfo.LOCAL_TIME + || temporal.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME || temporal.resultType == TimeIntervalTypeInfo.INTERVAL_MILLIS => ValidationSuccess @@ -97,12 +101,15 @@ abstract class TemporalCeilFloor( (unit.get, temporal.resultType) match { case (PlannerTimeIntervalUnit.YEAR | PlannerTimeIntervalUnit.MONTH, - SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP) => + SqlTimeTypeInfo.DATE | SqlTimeTypeInfo.TIMESTAMP | + LocalTimeTypeInfo.LOCAL_DATE | LocalTimeTypeInfo.LOCAL_DATE_TIME) => ValidationSuccess - case (PlannerTimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP) => + case (PlannerTimeIntervalUnit.DAY, SqlTimeTypeInfo.TIMESTAMP | + LocalTimeTypeInfo.LOCAL_DATE_TIME) => ValidationSuccess case (PlannerTimeIntervalUnit.HOUR | PlannerTimeIntervalUnit.MINUTE | - PlannerTimeIntervalUnit.SECOND, SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP) => + PlannerTimeIntervalUnit.SECOND, SqlTimeTypeInfo.TIME | SqlTimeTypeInfo.TIMESTAMP | + LocalTimeTypeInfo.LOCAL_TIME | LocalTimeTypeInfo.LOCAL_DATE_TIME) => ValidationSuccess case _ => ValidationFailure(s"Temporal ceil/floor operator does not support " + @@ -308,7 +315,11 @@ case class TimestampDiff( if timePoint1.resultType == SqlTimeTypeInfo.DATE || timePoint1.resultType == SqlTimeTypeInfo.TIMESTAMP || timePoint2.resultType == SqlTimeTypeInfo.DATE - || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP => + || timePoint2.resultType == SqlTimeTypeInfo.TIMESTAMP + || timePoint1.resultType == LocalTimeTypeInfo.LOCAL_DATE + || timePoint1.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME + || timePoint2.resultType == LocalTimeTypeInfo.LOCAL_DATE + || timePoint2.resultType == LocalTimeTypeInfo.LOCAL_DATE_TIME => ValidationSuccess case _ => diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala index 80f4a57..33108dd 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/typeutils/TypeInfoCheckUtils.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo, PojoTypeInfo} import org.apache.flink.table.api.ValidationException import org.apache.flink.table.planner.validate._ +import org.apache.flink.table.runtime.typeutils.{BigDecimalTypeInfo, DecimalTypeInfo} import org.apache.flink.table.typeutils.TimeIntervalTypeInfo.{INTERVAL_MILLIS, INTERVAL_MONTHS} import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TimeIntervalTypeInfo} @@ -48,7 +49,7 @@ object TypeInfoCheckUtils { def isNumeric(dataType: TypeInformation[_]): Boolean = dataType match { case _: NumericTypeInfo[_] => true - case BIG_DEC_TYPE_INFO => true + case BIG_DEC_TYPE_INFO | _: BigDecimalTypeInfo | _: DecimalTypeInfo => true case _ => false } @@ -56,7 +57,7 @@ object TypeInfoCheckUtils { isTimePoint(dataType) || isTimeInterval(dataType) def isTimePoint(dataType: TypeInformation[_]): Boolean = - dataType.isInstanceOf[SqlTimeTypeInfo[_]] + dataType.isInstanceOf[SqlTimeTypeInfo[_]] || dataType.isInstanceOf[LocalTimeTypeInfo[_]] def isTimeInterval(dataType: TypeInformation[_]): Boolean = dataType.isInstanceOf[TimeIntervalTypeInfo[_]] @@ -103,7 +104,7 @@ object TypeInfoCheckUtils { : ValidationResult = dataType match { case _: NumericTypeInfo[_] => ValidationSuccess - case BIG_DEC_TYPE_INFO => + case BIG_DEC_TYPE_INFO | _: BigDecimalTypeInfo | _: DecimalTypeInfo => ValidationSuccess case _ => ValidationFailure(s"$caller requires numeric types, get $dataType here")