Repository: flink Updated Branches: refs/heads/master 95e9004e3 -> 8fa313c39
[FLINK-4662] Bump Calcite version up to 1.9 This closes #2535. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fa313c3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fa313c3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fa313c3 Branch: refs/heads/master Commit: 8fa313c39fbad7bb96327477544d1ec15e8dc0f6 Parents: 95e9004 Author: Jark Wu <wuchong...@alibaba-inc.com> Authored: Thu Sep 22 21:47:37 2016 +0800 Committer: twalthr <twal...@apache.org> Committed: Mon Sep 26 15:58:39 2016 +0200 ---------------------------------------------------------------------- flink-libraries/flink-table/pom.xml | 2 +- .../apache/flink/api/table/FlinkPlannerImpl.scala | 15 ++++++++------- .../apache/flink/api/table/FlinkTypeFactory.scala | 8 ++++---- .../flink/api/table/codegen/CodeGenerator.scala | 4 ++-- .../flink/api/table/expressions/arithmetic.scala | 5 +++++ .../table/plan/nodes/dataset/DataSetConvention.scala | 7 ++++++- .../api/table/plan/nodes/dataset/DataSetRel.scala | 4 ++-- .../plan/nodes/datastream/DataStreamConvention.scala | 7 ++++++- 8 files changed, 34 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index e83a778..4c91f1c 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -51,7 +51,7 @@ under the License. <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> - <version>1.7.0</version> + <version>1.9.0</version> <exclusions> <exclusion> <groupId>org.apache.calcite.avatica</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala index bb08654..97e5cf2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkPlannerImpl.scala @@ -98,10 +98,10 @@ class FlinkPlannerImpl( assert(validatedSqlNode != null) val rexBuilder: RexBuilder = createRexBuilder val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + val config = SqlToRelConverter.configBuilder() + .withTrimUnusedFields(false).withConvertTableAccess(false).build() val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( - new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable) - sqlToRelConverter.setTrimUnusedFields(false) - sqlToRelConverter.enableTableAccessConversion(false) + new ViewExpanderImpl, validator, createCatalogReader, cluster, convertletTable, config) root = sqlToRelConverter.convertQuery(validatedSqlNode, false, true) root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) @@ -118,7 +118,8 @@ class FlinkPlannerImpl( override def expandView( rowType: RelDataType, queryString: String, - schemaPath: util.List[String]): RelRoot = { + schemaPath: util.List[String], + viewPath: util.List[String]): RelRoot = { val parser: SqlParser = SqlParser.create(queryString, parserConfig) var sqlNode: SqlNode = null @@ -136,10 +137,10 @@ class FlinkPlannerImpl( val validatedSqlNode: SqlNode = validator.validate(sqlNode) val rexBuilder: RexBuilder = createRexBuilder val cluster: RelOptCluster = RelOptCluster.create(planner, rexBuilder) + val config: SqlToRelConverter.Config = SqlToRelConverter.configBuilder + .withTrimUnusedFields(false).withConvertTableAccess(false).build val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter( - new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable) - sqlToRelConverter.setTrimUnusedFields(false) - sqlToRelConverter.enableTableAccessConversion(false) + new ViewExpanderImpl, validator, catalogReader, cluster, convertletTable, config) root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false) root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true)) root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel)) http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala index 5a116db..581ecde 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkTypeFactory.scala @@ -53,7 +53,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp createSqlIntervalType( new SqlIntervalQualifier(TimeUnit.YEAR, TimeUnit.MONTH, SqlParserPos.ZERO)) - case INTERVAL_DAY_TIME => + case INTERVAL_DAY_SECOND => createSqlIntervalType( new SqlIntervalQualifier(TimeUnit.DAY, TimeUnit.SECOND, SqlParserPos.ZERO)) @@ -97,7 +97,7 @@ object FlinkTypeFactory { case SqlTimeTypeInfo.TIME => TIME case SqlTimeTypeInfo.TIMESTAMP => TIMESTAMP case IntervalTypeInfo.INTERVAL_MONTHS => INTERVAL_YEAR_MONTH - case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_TIME + case IntervalTypeInfo.INTERVAL_MILLIS => INTERVAL_DAY_SECOND case CHAR_TYPE_INFO | CHAR_VALUE_TYPE_INFO => throw TableException("Character type is not supported.") @@ -121,8 +121,8 @@ object FlinkTypeFactory { case DATE => SqlTimeTypeInfo.DATE case TIME => SqlTimeTypeInfo.TIME case TIMESTAMP => SqlTimeTypeInfo.TIMESTAMP - case INTERVAL_YEAR_MONTH => IntervalTypeInfo.INTERVAL_MONTHS - case INTERVAL_DAY_TIME => IntervalTypeInfo.INTERVAL_MILLIS + case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MONTHS + case typeName if DAY_INTERVAL_TYPES.contains(typeName) => IntervalTypeInfo.INTERVAL_MILLIS case NULL => throw TableException("Type NULL is not supported. " + http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala index e5a07b1..b54c498 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala @@ -638,7 +638,7 @@ class CodeGenerator( case TIMESTAMP => generateNonNullLiteral(resultType, value.toString + "L") - case INTERVAL_YEAR_MONTH => + case typeName if YEAR_INTERVAL_TYPES.contains(typeName) => val decimal = BigDecimal(value.asInstanceOf[JBigDecimal]) if (decimal.isValidInt) { generateNonNullLiteral(resultType, decimal.intValue().toString) @@ -646,7 +646,7 @@ class CodeGenerator( throw new CodeGenException("Decimal can not be converted to interval of months.") } - case INTERVAL_DAY_TIME => + case typeName if DAY_INTERVAL_TYPES.contains(typeName) => val decimal = BigDecimal(value.asInstanceOf[JBigDecimal]) if (decimal.isValidLong) { generateNonNullLiteral(resultType, decimal.longValue().toString + "L") http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala index b301f22..4a7978a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/arithmetic.scala @@ -18,6 +18,7 @@ package org.apache.flink.api.table.expressions import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.`type`.IntervalSqlType import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder @@ -67,6 +68,10 @@ case class Plus(left: Expression, right: Expression) extends BinaryArithmetic { relBuilder.call(SqlStdOperatorTable.CONCAT, castedLeft.toRexNode, right.toRexNode) } else if (isTimeInterval(left.resultType) && left.resultType == right.resultType) { relBuilder.call(SqlStdOperatorTable.PLUS, left.toRexNode, right.toRexNode) + } else if (isTimeInterval(left.resultType) && isTemporal(right.resultType)) { + // Calcite has a bug that can't apply INTERVAL + DATETIME (INTERVAL at left) + // we manually switch them here + relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, right.toRexNode, left.toRexNode) } else if (isTemporal(left.resultType) && isTemporal(right.resultType)) { relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, left.toRexNode, right.toRexNode) } else { http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala index cbacd16..03d9a51 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetConvention.scala @@ -24,6 +24,12 @@ class DataSetConvention extends Convention { override def toString: String = getName + override def useAbstractConvertersForConversion( + fromTraits: RelTraitSet, + toTraits: RelTraitSet): Boolean = false + + override def canConvertConvention(toConvention: Convention): Boolean = false + def getInterface: Class[_] = classOf[DataSetRel] def getName: String = "DATASET" @@ -33,7 +39,6 @@ class DataSetConvention extends Convention { def satisfies(`trait`: RelTrait): Boolean = this eq `trait` def register(planner: RelOptPlanner): Unit = { } - } object DataSetConvention { http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala index 9ce1580..39532f0 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala @@ -62,8 +62,8 @@ trait DataSetRel extends RelNode with FlinkRel { case SqlTypeName.VARCHAR => s + 12 case SqlTypeName.CHAR => s + 1 case SqlTypeName.DECIMAL => s + 12 - case SqlTypeName.INTERVAL_DAY_TIME => s + 8 - case SqlTypeName.INTERVAL_YEAR_MONTH => s + 4 + case typeName if SqlTypeName.YEAR_INTERVAL_TYPES.contains(typeName) => s + 8 + case typeName if SqlTypeName.DAY_INTERVAL_TYPES.contains(typeName) => s + 4 case SqlTypeName.TIME | SqlTypeName.TIMESTAMP | SqlTypeName.DATE => s + 12 case _ => throw TableException(s"Unsupported data type encountered: $t") } http://git-wip-us.apache.org/repos/asf/flink/blob/8fa313c3/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala index bcce4c4..3b6a653 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamConvention.scala @@ -24,6 +24,12 @@ class DataStreamConvention extends Convention { override def toString: String = getName + override def useAbstractConvertersForConversion( + fromTraits: RelTraitSet, + toTraits: RelTraitSet): Boolean = false + + override def canConvertConvention(toConvention: Convention): Boolean = false + def getInterface: Class[_] = classOf[DataStreamRel] def getName: String = "DATASTREAM" @@ -33,7 +39,6 @@ class DataStreamConvention extends Convention { def satisfies(`trait`: RelTrait): Boolean = this eq `trait` def register(planner: RelOptPlanner): Unit = { } - } object DataStreamConvention {