[FLINK-6326] [table] Add ProjectMergeRule to logical optimization rule set.

This closes #3739.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4024afff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4024afff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4024afff

Branch: refs/heads/master
Commit: 4024afffcd5a3f011e8ba2a3a1053e8bae6cec4f
Parents: 46a950d
Author: godfreyhe <godfre...@163.com>
Authored: Wed Apr 19 19:46:18 2017 +0800
Committer: Fabian Hueske <fhue...@apache.org>
Committed: Fri Apr 21 18:22:10 2017 +0200

----------------------------------------------------------------------
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  2 +
 .../table/api/scala/batch/ExplainTest.scala     |  1 +
 .../batch/sql/QueryDecorrelationTest.scala      | 72 ++++++++++++--------
 .../api/scala/batch/sql/SetOperatorsTest.scala  | 20 +++---
 .../scala/batch/sql/WindowAggregateTest.scala   | 50 ++++++--------
 5 files changed, 77 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 94a1abb..0bee4e5 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -49,6 +49,8 @@ object FlinkRuleSets {
     FilterProjectTransposeRule.INSTANCE,
     // push a projection to the children of a join
     ProjectJoinTransposeRule.INSTANCE,
+    // merge projections
+    ProjectMergeRule.INSTANCE,
     // remove identity project
     ProjectRemoveRule.INSTANCE,
     // reorder sort and projection

http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
index a323ec9..1a6b314 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala
@@ -119,4 +119,5 @@ class ExplainTest
       
"../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", 
"\n")
     assertEquals(result, source)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
index 7496ff8..34ae346 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala
@@ -44,38 +44,41 @@ class QueryDecorrelationTest extends TableTestBase {
       "DataSetCalc",
       binaryNode(
         "DataSetJoin",
-        binaryNode(
-          "DataSetJoin",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "empno", "ename", "job", "salary", "deptno"),
-            term("where", "<(deptno, 10)")
-          ),
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(1),
-            term("select", "deptno", "name"),
-            term("where", "<(deptno, 15)")
+        unaryNode(
+          "DataSetCalc",
+          binaryNode(
+            "DataSetJoin",
+            unaryNode(
+              "DataSetCalc",
+              batchTableNode(0),
+              term("select", "empno", "salary", "deptno"),
+              term("where", "<(deptno, 10)")
+            ),
+            unaryNode(
+              "DataSetCalc",
+              batchTableNode(1),
+              term("select", "deptno"),
+              term("where", "<(deptno, 15)")
+            ),
+            term("where", "=(deptno, deptno0)"),
+            term("join", "empno", "salary", "deptno", "deptno0"),
+            term("joinType", "InnerJoin")
           ),
-          term("where", "=(deptno, deptno0)"),
-          term("join", "empno", "ename", "job", "salary", "deptno", "deptno0", 
"name"),
-          term("joinType", "InnerJoin")
+          term("select", "empno", "salary")
         ),
         unaryNode(
           "DataSetAggregate",
           unaryNode(
             "DataSetCalc",
             batchTableNode(0),
-            term("select", "salary", "empno"),
+            term("select", "empno", "salary"),
             term("where", "IS NOT NULL(empno)")
           ),
           term("groupBy", "empno"),
           term("select", "empno", "AVG(salary) AS EXPR$0")
         ),
         term("where", "AND(=(empno, empno0), >(salary, EXPR$0))"),
-        term("join", "empno", "ename", "job", "salary", "deptno",
-          "deptno0", "name", "empno0", "EXPR$0"),
+        term("join", "empno", "salary", "empno0", "EXPR$0"),
         term("joinType", "InnerJoin")
       ),
       term("select", "empno")
@@ -105,13 +108,25 @@ class QueryDecorrelationTest extends TableTestBase {
           "DataSetCalc",
           binaryNode(
             "DataSetJoin",
-            binaryNode(
-              "DataSetJoin",
-              batchTableNode(0),
-              batchTableNode(1),
-              term("where", "=(deptno, deptno0)"),
-              term("join", "empno", "ename", "job", "salary", "deptno", 
"deptno0", "name"),
-              term("joinType", "InnerJoin")
+            unaryNode(
+              "DataSetCalc",
+              binaryNode(
+                "DataSetJoin",
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(0),
+                  term("select", "empno", "salary", "deptno")
+                ),
+                unaryNode(
+                  "DataSetCalc",
+                  batchTableNode(1),
+                  term("select", "deptno")
+                ),
+                term("where", "=(deptno, deptno0)"),
+                term("join", "empno", "salary", "deptno", "deptno0"),
+                term("joinType", "InnerJoin")
+              ),
+              term("select", "empno", "salary", "deptno0")
             ),
             unaryNode(
               "DataSetAggregate",
@@ -124,9 +139,8 @@ class QueryDecorrelationTest extends TableTestBase {
               term("groupBy", "deptno"),
               term("select", "deptno", "AVG(salary) AS EXPR$0")
             ),
-            term("where", "AND(=(deptno0, deptno1), >(salary, EXPR$0))"),
-            term("join", "empno", "ename", "job", "salary", "deptno", 
"deptno0",
-              "name", "deptno1", "EXPR$0"),
+            term("where", "AND(=(deptno0, deptno), >(salary, EXPR$0))"),
+            term("join", "empno", "salary", "deptno0", "deptno", "EXPR$0"),
             term("joinType", "InnerJoin")
           ),
           term("select", "empno")

http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
index 2f9057d..ba0326a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala
@@ -55,18 +55,22 @@ class SetOperatorsTest extends TableTestBase {
         "DataSetJoin",
         batchTableNode(0),
         unaryNode(
-          "DataSetAggregate",
+          "DataSetCalc",
           unaryNode(
-            "DataSetCalc",
-            batchTableNode(1),
-            term("select", "b_long AS b_long3", "true AS $f0"),
-            term("where", "IS NOT NULL(b_long)")
+            "DataSetAggregate",
+            unaryNode(
+              "DataSetCalc",
+              batchTableNode(1),
+              term("select", "b_long AS b_long3", "true AS $f0"),
+              term("where", "IS NOT NULL(b_long)")
+            ),
+            term("groupBy", "b_long3"),
+            term("select", "b_long3", "MIN($f0) AS $f1")
           ),
-          term("groupBy", "b_long3"),
-          term("select", "b_long3", "MIN($f0) AS $f1")
+          term("select", "b_long3")
         ),
         term("where", "=(a_long, b_long3)"),
-        term("join", "a_long", "a_int", "a_string", "b_long3", "$f1"),
+        term("join", "a_long", "a_int", "a_string", "b_long3"),
         term("joinType", "InnerJoin")
       ),
       term("select", "a_int", "a_string")

http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
index 25a81c4..cd6c77d 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala
@@ -40,18 +40,14 @@ class WindowAggregateTest extends TableTestBase {
 
     val expected =
       unaryNode(
-        "DataSetCalc",
+        "DataSetWindowAggregate",
         unaryNode(
-          "DataSetWindowAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "ts, a, b")
-          ),
-          term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 
7200000.millis)),
-          term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "ts, a, b")
         ),
-        term("select", "sumA, cntB")
+        term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 
7200000.millis)),
+        term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -101,19 +97,15 @@ class WindowAggregateTest extends TableTestBase {
 
     val expected =
       unaryNode(
-        "DataSetCalc",
+        "DataSetWindowAggregate",
         unaryNode(
-          "DataSetWindowAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "ts, a, b")
-          ),
-          term("window",
-            EventTimeSlidingGroupWindow(Some('w$), 'ts, 5400000.millis, 
900000.millis)),
-          term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "ts, a, b")
         ),
-        term("select", "sumA, cntB")
+        term("window",
+          EventTimeSlidingGroupWindow(Some('w$), 'ts, 5400000.millis, 
900000.millis)),
+        term("select", "SUM(a) AS sumA, COUNT(b) AS cntB")
       )
 
     util.verifySql(sqlQuery, expected)
@@ -162,18 +154,14 @@ class WindowAggregateTest extends TableTestBase {
 
     val expected =
       unaryNode(
-        "DataSetCalc",
+        "DataSetWindowAggregate",
         unaryNode(
-          "DataSetWindowAggregate",
-          unaryNode(
-            "DataSetCalc",
-            batchTableNode(0),
-            term("select", "ts")
-          ),
-          term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 
1800000.millis)),
-          term("select", "COUNT(*) AS cnt")
+          "DataSetCalc",
+          batchTableNode(0),
+          term("select", "ts")
         ),
-        term("select", "cnt")
+        term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 
1800000.millis)),
+        term("select", "COUNT(*) AS cnt")
       )
 
     util.verifySql(sqlQuery, expected)

Reply via email to