Repository: flink
Updated Branches:
  refs/heads/master 121b12b7c -> bec818d84


[FLINK-5414] [table] Bump up Calcite version to 1.11. (Jark Wu and Haohui Mai)

This closes #3338.
This closes #3426.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bec818d8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bec818d8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bec818d8

Branch: refs/heads/master
Commit: bec818d84a65a812290d49bca9cfd62de7379b1e
Parents: 121b12b
Author: Haohui Mai <whe...@apache.org>
Authored: Mon Feb 27 14:24:08 2017 -0800
Committer: twalthr <twal...@apache.org>
Committed: Wed Mar 8 16:27:55 2017 +0100

----------------------------------------------------------------------
 flink-libraries/flink-table/pom.xml             |  2 +-
 .../flink/table/calcite/FlinkTypeFactory.scala  | 21 +++++-----
 .../flink/table/codegen/ExpressionReducer.scala | 10 ++++-
 .../functions/utils/ScalarSqlFunction.scala     |  3 +-
 .../flink/table/plan/ProjectionTranslator.scala | 40 ++++++++++++++------
 .../flink/table/plan/nodes/FlinkRel.scala       |  6 ++-
 .../scala/batch/table/FieldProjectionTest.scala |  8 ++--
 .../scala/stream/sql/WindowAggregateTest.scala  |  8 ++--
 .../scala/stream/table/GroupWindowTest.scala    |  8 ++--
 .../table/expressions/ScalarFunctionsTest.scala | 10 ++---
 10 files changed, 75 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index 428b947..b26fe54 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -51,7 +51,7 @@ under the License.
                <dependency>
                        <groupId>org.apache.calcite</groupId>
                        <artifactId>calcite-core</artifactId>
-                       <version>1.9.0</version>
+                       <version>1.11.0</version>
                        <exclusions>
                                <exclusion>
                                        
<groupId>org.apache.calcite.avatica</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 251be14..22a5c9f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -131,15 +131,18 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
   }
 
   override def createTypeWithNullability(
-    relDataType: RelDataType,
-    nullable: Boolean)
-  : RelDataType = relDataType match {
-    case composite: CompositeRelDataType =>
-      // at the moment we do not care about nullability
-      composite
-    case _ =>
-      super.createTypeWithNullability(relDataType, nullable)
-  }
+      relDataType: RelDataType,
+      nullable: Boolean)
+    : RelDataType = relDataType match {
+      case composite: CompositeRelDataType =>
+        // at the moment we do not care about nullability
+        canonize(composite)
+      case array: ArrayRelDataType =>
+        val elementType = createTypeWithNullability(array.getComponentType, 
nullable)
+        canonize(new ArrayRelDataType(array.typeInfo, elementType, nullable))
+      case _ =>
+        super.createTypeWithNullability(relDataType, nullable)
+    }
 }
 
 object FlinkTypeFactory {

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
index 0f1de21..3fcbdc1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/ExpressionReducer.scala
@@ -106,8 +106,16 @@ class ExpressionReducer(config: TableConfig)
         case SqlTypeName.ANY | SqlTypeName.ROW | SqlTypeName.ARRAY =>
           reducedValues.add(unreduced)
         case _ =>
+          val reducedValue = reduced.getField(reducedIdx)
+          // RexBuilder handle double literal incorrectly, convert it into 
BigDecimal manually
+          val value = if (unreduced.getType.getSqlTypeName == 
SqlTypeName.DOUBLE) {
+            new 
java.math.BigDecimal(reducedValue.asInstanceOf[Number].doubleValue())
+          } else {
+            reducedValue
+          }
+
           val literal = rexBuilder.makeLiteral(
-            reduced.getField(reducedIdx),
+            value,
             unreduced.getType,
             true)
           reducedValues.add(literal)

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
index da652e0..dc6d41f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
@@ -85,7 +85,8 @@ object ScalarSqlFunction {
               s"Expected: ${signaturesToString(scalarFunction)}")
         }
         val resultType = getResultType(scalarFunction, foundSignature.get)
-        typeFactory.createTypeFromTypeInfo(resultType)
+        val t = typeFactory.createTypeFromTypeInfo(resultType)
+        typeFactory.createTypeWithNullability(t, nullable = true)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
index ed6cf7b..94a0aa1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.plan.logical.{LogicalNode, Project}
 
+import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
 object ProjectionTranslator {
@@ -108,7 +109,9 @@ object ProjectionTranslator {
       tableEnv: TableEnvironment,
       aggNames: Map[Expression, String],
       propNames: Map[Expression, String]): Seq[NamedExpression] = {
-    exprs.map(replaceAggregationsAndProperties(_, tableEnv, aggNames, 
propNames))
+    val projectedNames = new mutable.HashSet[String]
+    exprs.map((exp: Expression) => replaceAggregationsAndProperties(exp, 
tableEnv,
+      aggNames, propNames, projectedNames))
         .map(UnresolvedAlias)
   }
 
@@ -116,15 +119,24 @@ object ProjectionTranslator {
       exp: Expression,
       tableEnv: TableEnvironment,
       aggNames: Map[Expression, String],
-      propNames: Map[Expression, String]) : Expression = {
+      propNames: Map[Expression, String],
+      projectedNames: mutable.HashSet[String]) : Expression = {
 
     exp match {
       case agg: Aggregation =>
         val name = aggNames(agg)
-        Alias(UnresolvedFieldReference(name), 
tableEnv.createUniqueAttributeName())
+        if (projectedNames.add(name)) {
+          UnresolvedFieldReference(name)
+        } else {
+          Alias(UnresolvedFieldReference(name), 
tableEnv.createUniqueAttributeName())
+        }
       case prop: WindowProperty =>
         val name = propNames(prop)
-        Alias(UnresolvedFieldReference(name), 
tableEnv.createUniqueAttributeName())
+        if (projectedNames.add(name)) {
+          UnresolvedFieldReference(name)
+        } else {
+          Alias(UnresolvedFieldReference(name), 
tableEnv.createUniqueAttributeName())
+        }
       case n @ Alias(agg: Aggregation, name, _) =>
         val aName = aggNames(agg)
         Alias(UnresolvedFieldReference(aName), name)
@@ -133,34 +145,40 @@ object ProjectionTranslator {
         Alias(UnresolvedFieldReference(pName), name)
       case l: LeafExpression => l
       case u: UnaryExpression =>
-        val c = replaceAggregationsAndProperties(u.child, tableEnv, aggNames, 
propNames)
+        val c = replaceAggregationsAndProperties(u.child, tableEnv,
+          aggNames, propNames, projectedNames)
         u.makeCopy(Array(c))
       case b: BinaryExpression =>
-        val l = replaceAggregationsAndProperties(b.left, tableEnv, aggNames, 
propNames)
-        val r = replaceAggregationsAndProperties(b.right, tableEnv, aggNames, 
propNames)
+        val l = replaceAggregationsAndProperties(b.left, tableEnv,
+          aggNames, propNames, projectedNames)
+        val r = replaceAggregationsAndProperties(b.right, tableEnv,
+          aggNames, propNames, projectedNames)
         b.makeCopy(Array(l, r))
 
       // Functions calls
       case c @ Call(name, args) =>
-        val newArgs = args.map(replaceAggregationsAndProperties(_, tableEnv, 
aggNames, propNames))
+        val newArgs = args.map((exp: Expression) =>
+          replaceAggregationsAndProperties(exp, tableEnv, aggNames, propNames, 
projectedNames))
         c.makeCopy(Array(name, newArgs))
 
       case sfc @ ScalarFunctionCall(clazz, args) =>
         val newArgs: Seq[Expression] = args
-          .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, 
propNames))
+          .map((exp: Expression) =>
+            replaceAggregationsAndProperties(exp, tableEnv, aggNames, 
propNames, projectedNames))
         sfc.makeCopy(Array(clazz, newArgs))
 
       // array constructor
       case c @ ArrayConstructor(args) =>
         val newArgs = c.elements
-          .map(replaceAggregationsAndProperties(_, tableEnv, aggNames, 
propNames))
+          .map((exp: Expression) =>
+            replaceAggregationsAndProperties(exp, tableEnv, aggNames, 
propNames, projectedNames))
         c.makeCopy(Array(newArgs))
 
       // General expression
       case e: Expression =>
         val newArgs = e.productIterator.map {
           case arg: Expression =>
-            replaceAggregationsAndProperties(arg, tableEnv, aggNames, 
propNames)
+            replaceAggregationsAndProperties(arg, tableEnv, aggNames, 
propNames, projectedNames)
         }
         e.makeCopy(newArgs.toArray)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
index 7ad9bd5..258d7f2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/FlinkRel.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.plan.nodes
 
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.SqlAsOperator
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -54,7 +55,10 @@ trait FlinkRel {
       case c: RexCall =>
         val op = c.getOperator.toString
         val ops = c.getOperands.map(getExpressionString(_, inFields, 
localExprsTable))
-        s"$op(${ops.mkString(", ")})"
+        c.getOperator match {
+          case _ : SqlAsOperator => ops.head
+          case _ => s"$op(${ops.mkString(", ")})"
+        }
 
       case fa: RexFieldAccess =>
         val referenceExpr = getExpressionString(fa.getReferenceExpr, inFields, 
localExprsTable)

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index a0412d5..4d0d9aa 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -167,7 +167,7 @@ class FieldProjectionTest extends TableTestBase {
           term("groupBy", "c"),
           term("select", "c", "SUM(a) AS TMP_0")
         ),
-        term("select", "TMP_0 AS TMP_1")
+        term("select", "TMP_0")
       )
 
     util.verifyTable(resultTable, expected)
@@ -191,7 +191,7 @@ class FieldProjectionTest extends TableTestBase {
           term("groupBy", "k"),
           term("select", "k", "SUM(a) AS TMP_0")
         ),
-        term("select", "TMP_0 AS TMP_1")
+        term("select", "TMP_0")
       )
 
     util.verifyTable(resultTable, expected)
@@ -215,7 +215,7 @@ class FieldProjectionTest extends TableTestBase {
           term("groupBy", "k"),
           term("select", "k", "SUM(a) AS TMP_0")
         ),
-        term("select", "TMP_0 AS TMP_1")
+        term("select", "TMP_0")
       )
 
     util.verifyTable(resultTable, expected)
@@ -273,7 +273,7 @@ class FieldProjectionTest extends TableTestBase {
               5.millis)),
           term("select", "b", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
         ),
-        term("select", "TMP_0 AS TMP_2", "TMP_1 AS TMP_3", "b")
+        term("select", "TMP_0", "TMP_1", "b")
     )
 
     streamUtil.verifyTable(resultTable, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index e12572f..85bc5a7 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -40,7 +40,7 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "CAST(1970-01-01 00:00:00) AS $f0")
+            term("select", "1970-01-01 00:00:00 AS $f0")
           ),
           term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 
3600000.millis)),
           term("select", "COUNT(*) AS EXPR$0")
@@ -61,7 +61,7 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1")
+            term("select", "a", "1970-01-01 00:00:00 AS $f1")
           ),
           term("groupBy", "a"),
           term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 
60000.millis)),
@@ -83,7 +83,7 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "a", "CAST(1970-01-01 00:00:00) AS $f1, b, c")
+            term("select", "a", "1970-01-01 00:00:00 AS $f1, b, c")
           ),
           term("groupBy", "a, b"),
           term("window", EventTimeTumblingGroupWindow(None, 'rowtime, 
1000.millis)),
@@ -105,7 +105,7 @@ class WindowAggregateTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "CAST(1970-01-01 00:00:00) AS $f0")
+            term("select", "1970-01-01 00:00:00 AS $f0")
           ),
           term("window", ProcessingTimeTumblingGroupWindow(None, 
3600000.millis)),
           term("select", "COUNT(*) AS EXPR$0")

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index 8708649..fde7682 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -206,7 +206,7 @@ class GroupWindowTest extends TableTestBase {
         ProcessingTimeSlidingGroupWindow(
           Some(WindowReference("w2")),
           20.milli, 10.milli)),
-      term("select", "COUNT(string) AS TMP_2")
+      term("select", "COUNT(string) AS TMP_1")
     )
     util.verifyTable(windowedTable, expected)
   }
@@ -860,12 +860,12 @@ class GroupWindowTest extends TableTestBase {
       ),
       term("select",
         "string",
-        "+(CAST(AS(TMP_0, 'TMP_3')), CAST(1)) AS s1",
-        "+(CAST(AS(TMP_0, 'TMP_4')), CAST(3)) AS s2",
+        "+(CAST(TMP_0), 1) AS s1",
+        "+(CAST(TMP_0), 3) AS s2",
         "TMP_1 AS x",
         "TMP_1 AS x2",
         "TMP_2 AS x3",
-        "TMP_2 AS TMP_5")
+        "TMP_2")
     )
 
     util.verifyTable(windowedTable, expected)

http://git-wip-us.apache.org/repos/asf/flink/blob/bec818d8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 596907b..03be995 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -22,10 +22,10 @@ import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.types.Row
-import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
 import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class ScalarFunctionsTest extends ExpressionTestBase {
@@ -385,12 +385,12 @@ class ScalarFunctionsTest extends ExpressionTestBase {
 
     testAllApis(
       'f7.exp(),
-      "exp(3)",
-      "EXP(3)",
+      "exp(f7)",
+      "EXP(f7)",
       math.exp(3).toString)
 
     testAllApis(
-      'f7.exp(),
+      3.exp(),
       "exp(3)",
       "EXP(3)",
       math.exp(3).toString)

Reply via email to