Repository: flink Updated Branches: refs/heads/master 121b12b7c -> bec818d84
[FLINK-5414] [table] Bump up Calcite version to 1.11. (Jark Wu and Haohui Mai) This closes #3338. This closes #3426. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bec818d8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bec818d8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bec818d8 Branch: refs/heads/master Commit: bec818d84a65a812290d49bca9cfd62de7379b1e Parents: 121b12b Author: Haohui Mai <whe...@apache.org> Authored: Mon Feb 27 14:24:08 2017 -0800 Committer: twalthr <twal...@apache.org> Committed: Wed Mar 8 16:27:55 2017 +0100 ---------------------------------------------------------------------- flink-libraries/flink-table/pom.xml | 2 +- .../flink/table/calcite/FlinkTypeFactory.scala | 21 +++++----- .../flink/table/codegen/ExpressionReducer.scala | 10 ++++- .../functions/utils/ScalarSqlFunction.scala | 3 +- .../flink/table/plan/ProjectionTranslator.scala | 40 ++++++++++++++------ .../flink/table/plan/nodes/FlinkRel.scala | 6 ++- .../scala/batch/table/FieldProjectionTest.scala | 8 ++-- .../scala/stream/sql/WindowAggregateTest.scala | 8 ++-- .../scala/stream/table/GroupWindowTest.scala | 8 ++-- .../table/expressions/ScalarFunctionsTest.scala | 10 ++--- 10 files changed, 75 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/pom.xml ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index 428b947..b26fe54 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -51,7 +51,7 @@ under the License. <dependency> <groupId>org.apache.calcite</groupId> <artifactId>calcite-core</artifactId> - <version>1.9.0</version> + <version>1.11.0</version> <exclusions> <exclusion> <groupId>org.apache.calcite.avatica</groupId> http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala index 251be14..22a5c9f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala @@ -131,15 +131,18 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp } override def createTypeWithNullability( - relDataType: RelDataType, - nullable: Boolean) - : RelDataType = relDataType match { - case composite: CompositeRelDataType => - // at the moment we do not care about nullability - composite - case _ => - super.createTypeWithNullability(relDataType, nullable) - } + relDataType: RelDataType, + nullable: Boolean) + : RelDataType = relDataType match { + case composite: CompositeRelDataType => + // at the moment we do not care about nullability + canonize(composite) + case array: ArrayRelDataType => + val elementType = createTypeWithNullability(array.getComponentType, nullable) + canonize(new ArrayRelDataType(array.typeInfo, elementType, nullable)) + case _ => + super.createTypeWithNullability(relDataType, nullable) + } } object FlinkTypeFactory { http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala index 0f1de21..3fcbdc1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala @@ -106,8 +106,16 @@ class ExpressionReducer(config: TableConfig) case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY => reducedValues.add(unreduced) case _ => + val reducedValue = reduced.getField(reducedIdx) + // RexBuilder handle double literal incorrectly, convert it into BigDecimal manually + val value = if (unreduced.getType.getSqlTypeName == SqlTypeName.DOUBLE) { + new java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue()) + } else { + reducedValue + } + val literal = rexBuilder.makeLiteral( - reduced.getField(reducedIdx), + value, unreduced.getType, true) reducedValues.add(literal) http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala index da652e0..dc6d41f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala @@ -85,7 +85,8 @@ object ScalarSqlFunction { s"Expected: ${signaturesToString(scalarFunction)}") } val resultType = getResultType(scalarFunction, foundSignature.get) - typeFactory.createTypeFromTypeInfo(resultType) + val t = typeFactory.createTypeFromTypeInfo(resultType) + typeFactory.createTypeWithNullability(t, nullable = true) } } } http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala index ed6cf7b..94a0aa1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala @@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableEnvironment import org.apache.flink.table.expressions._ import org.apache.flink.table.plan.logical.{LogicalNode, Project} +import scala.collection.mutable import scala.collection.mutable.ListBuffer object ProjectionTranslator { @@ -108,7 +109,9 @@ object ProjectionTranslator { tableEnv: TableEnvironment, aggNames: Map[Expression, String], propNames: Map[Expression, String]): Seq[NamedExpression] = { - exprs.map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames)) + val projectedNames = new mutable.HashSet[String] + exprs.map((exp: Expression) => replaceAggregationsAndProperties(exp, tableEnv, + aggNames, propNames, projectedNames)) .map(UnresolvedAlias) } @@ -116,15 +119,24 @@ object ProjectionTranslator { exp: Expression, tableEnv: TableEnvironment, aggNames: Map[Expression, String], - propNames: Map[Expression, String]) : Expression = { + propNames: Map[Expression, String], + projectedNames: mutable.HashSet[String]) : Expression = { exp match { case agg: Aggregation => val name = aggNames(agg) - Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName()) + if (projectedNames.add(name)) { + UnresolvedFieldReference(name) + } else { + Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName()) + } case prop: WindowProperty => val name = propNames(prop) - Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName()) + if (projectedNames.add(name)) { + UnresolvedFieldReference(name) + } else { + Alias(UnresolvedFieldReference(name), tableEnv.createUniqueAttributeName()) + } case n @ Alias(agg: Aggregation, name, _) => val aName = aggNames(agg) Alias(UnresolvedFieldReference(aName), name) @@ -133,34 +145,40 @@ object ProjectionTranslator { Alias(UnresolvedFieldReference(pName), name) case l: LeafExpression => l case u: UnaryExpression => - val c = replaceAggregationsAndProperties(u.child, tableEnv, aggNames, propNames) + val c = replaceAggregationsAndProperties(u.child, tableEnv, + aggNames, propNames, projectedNames) u.makeCopy(Array(c)) case b: BinaryExpression => - val l = replaceAggregationsAndProperties(b.left, tableEnv, aggNames, propNames) - val r = replaceAggregationsAndProperties(b.right, tableEnv, aggNames, propNames) + val l = replaceAggregationsAndProperties(b.left, tableEnv, + aggNames, propNames, projectedNames) + val r = replaceAggregationsAndProperties(b.right, tableEnv, + aggNames, propNames, projectedNames) b.makeCopy(Array(l, r)) // Functions calls case c @ Call(name, args) => - val newArgs = args.map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames)) + val newArgs = args.map((exp: Expression) => + replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, projectedNames)) c.makeCopy(Array(name, newArgs)) case sfc @ ScalarFunctionCall(clazz, args) => val newArgs: Seq[Expression] = args - .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames)) + .map((exp: Expression) => + replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, projectedNames)) sfc.makeCopy(Array(clazz, newArgs)) // array constructor case c @ ArrayConstructor(args) => val newArgs = c.elements - .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames)) + .map((exp: Expression) => + replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, projectedNames)) c.makeCopy(Array(newArgs)) // General expression case e: Expression => val newArgs = e.productIterator.map { case arg: Expression => - replaceAggregationsAndProperties(arg, tableEnv, aggNames, propNames) + replaceAggregationsAndProperties(arg, tableEnv, aggNames, propNames, projectedNames) } e.makeCopy(newArgs.toArray) } http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala index 7ad9bd5..258d7f2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.plan.nodes import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rex._ +import org.apache.calcite.sql.SqlAsOperator import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.TypeInformation @@ -54,7 +55,10 @@ trait FlinkRel { case c: RexCall => val op = c.getOperator.toString val ops = c.getOperands.map(getExpressionString(_, inFields, localExprsTable)) - s"$op(${ops.mkString(", ")})" + c.getOperator match { + case _ : SqlAsOperator => ops.head + case _ => s"$op(${ops.mkString(", ")})" + } case fa: RexFieldAccess => val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, localExprsTable) http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala index a0412d5..4d0d9aa 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala @@ -167,7 +167,7 @@ class FieldProjectionTest extends TableTestBase { term("groupBy", "c"), term("select", "c", "SUM(a) AS TMP_0") ), - term("select", "TMP_0 AS TMP_1") + term("select", "TMP_0") ) util.verifyTable(resultTable, expected) @@ -191,7 +191,7 @@ class FieldProjectionTest extends TableTestBase { term("groupBy", "k"), term("select", "k", "SUM(a) AS TMP_0") ), - term("select", "TMP_0 AS TMP_1") + term("select", "TMP_0") ) util.verifyTable(resultTable, expected) @@ -215,7 +215,7 @@ class FieldProjectionTest extends TableTestBase { term("groupBy", "k"), term("select", "k", "SUM(a) AS TMP_0") ), - term("select", "TMP_0 AS TMP_1") + term("select", "TMP_0") ) util.verifyTable(resultTable, expected) @@ -273,7 +273,7 @@ class FieldProjectionTest extends TableTestBase { 5.millis)), term("select", "b", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1") ), - term("select", "TMP_0 AS TMP_2", "TMP_1 AS TMP_3", "b") + term("select", "TMP_0", "TMP_1", "b") ) streamUtil.verifyTable(resultTable, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala index e12572f..85bc5a7 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala @@ -40,7 +40,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "CAST(1970-01-01 00:00:00) AS $f0") + term("select", "1970-01-01 00:00:00 AS $f0") ), term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 3600000.millis)), term("select", "COUNT(*) AS EXPR$0") @@ -61,7 +61,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1") + term("select", "a", "1970-01-01 00:00:00 AS $f1") ), term("groupBy", "a"), term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 60000.millis)), @@ -83,7 +83,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1, b, c") + term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c") ), term("groupBy", "a, b"), term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 1000.millis)), @@ -105,7 +105,7 @@ class WindowAggregateTest extends TableTestBase { unaryNode( "DataStreamCalc", streamTableNode(0), - term("select", "CAST(1970-01-01 00:00:00) AS $f0") + term("select", "1970-01-01 00:00:00 AS $f0") ), term("window", ProcessingTimeTumblingGroupWindow(None, 3600000.millis)), term("select", "COUNT(*) AS EXPR$0") http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala index 8708649..fde7682 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala @@ -206,7 +206,7 @@ class GroupWindowTest extends TableTestBase { ProcessingTimeSlidingGroupWindow( Some(WindowReference("w2")), 20.milli, 10.milli)), - term("select", "COUNT(string) AS TMP_2") + term("select", "COUNT(string) AS TMP_1") ) util.verifyTable(windowedTable, expected) } @@ -860,12 +860,12 @@ class GroupWindowTest extends TableTestBase { ), term("select", "string", - "+(CAST(AS(TMP_0, 'TMP_3')), CAST(1)) AS s1", - "+(CAST(AS(TMP_0, 'TMP_4')), CAST(3)) AS s2", + "+(CAST(TMP_0), 1) AS s1", + "+(CAST(TMP_0), 3) AS s2", "TMP_1 AS x", "TMP_1 AS x2", "TMP_2 AS x3", - "TMP_2 AS TMP_5") + "TMP_2") ) util.verifyTable(windowedTable, expected) http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala index 596907b..03be995 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala @@ -22,10 +22,10 @@ import java.sql.{Date, Time, Timestamp} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.types.Row -import org.apache.flink.table.api.{Types, ValidationException} import org.apache.flink.table.api.scala._ +import org.apache.flink.table.api.{Types, ValidationException} import org.apache.flink.table.expressions.utils.ExpressionTestBase +import org.apache.flink.types.Row import org.junit.Test class ScalarFunctionsTest extends ExpressionTestBase { @@ -385,12 +385,12 @@ class ScalarFunctionsTest extends ExpressionTestBase { testAllApis( 'f7.exp(), - "exp(3)", - "EXP(3)", + "exp(f7)", + "EXP(f7)", math.exp(3).toString) testAllApis( - 'f7.exp(), + 3.exp(), "exp(3)", "EXP(3)", math.exp(3).toString)