[FLINK-6326] [table] Add ProjectMergeRule to logical optimization rule set.
This closes #3739. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4024afff Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4024afff Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4024afff Branch: refs/heads/master Commit: 4024afffcd5a3f011e8ba2a3a1053e8bae6cec4f Parents: 46a950d Author: godfreyhe <godfre...@163.com> Authored: Wed Apr 19 19:46:18 2017 +0800 Committer: Fabian Hueske <fhue...@apache.org> Committed: Fri Apr 21 18:22:10 2017 +0200 ---------------------------------------------------------------------- .../flink/table/plan/rules/FlinkRuleSets.scala | 2 + .../table/api/scala/batch/ExplainTest.scala | 1 + .../batch/sql/QueryDecorrelationTest.scala | 72 ++++++++++++-------- .../api/scala/batch/sql/SetOperatorsTest.scala | 20 +++--- .../scala/batch/sql/WindowAggregateTest.scala | 50 ++++++-------- 5 files changed, 77 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 94a1abb..0bee4e5 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -49,6 +49,8 @@ object FlinkRuleSets { FilterProjectTransposeRule.INSTANCE, // push a projection to the children of a join ProjectJoinTransposeRule.INSTANCE, + // merge projections + ProjectMergeRule.INSTANCE, // remove identity project ProjectRemoveRule.INSTANCE, // reorder sort and projection http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala index a323ec9..1a6b314 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala @@ -119,4 +119,5 @@ class ExplainTest "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n") assertEquals(result, source) } + } http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala index 7496ff8..34ae346 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala @@ -44,38 +44,41 @@ class QueryDecorrelationTest extends TableTestBase { "DataSetCalc", binaryNode( "DataSetJoin", - binaryNode( - "DataSetJoin", - unaryNode( - "DataSetCalc", - batchTableNode(0), - term("select", "empno", "ename", "job", "salary", "deptno"), - term("where", "<(deptno, 10)") - ), - unaryNode( - "DataSetCalc", - batchTableNode(1), - term("select", "deptno", "name"), - term("where", "<(deptno, 15)") + unaryNode( + "DataSetCalc", + binaryNode( + "DataSetJoin", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "empno", "salary", "deptno"), + term("where", "<(deptno, 10)") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "deptno"), + term("where", "<(deptno, 15)") + ), + term("where", "=(deptno, deptno0)"), + term("join", "empno", "salary", "deptno", "deptno0"), + term("joinType", "InnerJoin") ), - term("where", "=(deptno, deptno0)"), - term("join", "empno", "ename", "job", "salary", "deptno", "deptno0", "name"), - term("joinType", "InnerJoin") + term("select", "empno", "salary") ), unaryNode( "DataSetAggregate", unaryNode( "DataSetCalc", batchTableNode(0), - term("select", "salary", "empno"), + term("select", "empno", "salary"), term("where", "IS NOT NULL(empno)") ), term("groupBy", "empno"), term("select", "empno", "AVG(salary) AS EXPR$0") ), term("where", "AND(=(empno, empno0), >(salary, EXPR$0))"), - term("join", "empno", "ename", "job", "salary", "deptno", - "deptno0", "name", "empno0", "EXPR$0"), + term("join", "empno", "salary", "empno0", "EXPR$0"), term("joinType", "InnerJoin") ), term("select", "empno") @@ -105,13 +108,25 @@ class QueryDecorrelationTest extends TableTestBase { "DataSetCalc", binaryNode( "DataSetJoin", - binaryNode( - "DataSetJoin", - batchTableNode(0), - batchTableNode(1), - term("where", "=(deptno, deptno0)"), - term("join", "empno", "ename", "job", "salary", "deptno", "deptno0", "name"), - term("joinType", "InnerJoin") + unaryNode( + "DataSetCalc", + binaryNode( + "DataSetJoin", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "empno", "salary", "deptno") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "deptno") + ), + term("where", "=(deptno, deptno0)"), + term("join", "empno", "salary", "deptno", "deptno0"), + term("joinType", "InnerJoin") + ), + term("select", "empno", "salary", "deptno0") ), unaryNode( "DataSetAggregate", @@ -124,9 +139,8 @@ class QueryDecorrelationTest extends TableTestBase { term("groupBy", "deptno"), term("select", "deptno", "AVG(salary) AS EXPR$0") ), - term("where", "AND(=(deptno0, deptno1), >(salary, EXPR$0))"), - term("join", "empno", "ename", "job", "salary", "deptno", "deptno0", - "name", "deptno1", "EXPR$0"), + term("where", "AND(=(deptno0, deptno), >(salary, EXPR$0))"), + term("join", "empno", "salary", "deptno0", "deptno", "EXPR$0"), term("joinType", "InnerJoin") ), term("select", "empno") http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala index 2f9057d..ba0326a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala @@ -55,18 +55,22 @@ class SetOperatorsTest extends TableTestBase { "DataSetJoin", batchTableNode(0), unaryNode( - "DataSetAggregate", + "DataSetCalc", unaryNode( - "DataSetCalc", - batchTableNode(1), - term("select", "b_long AS b_long3", "true AS $f0"), - term("where", "IS NOT NULL(b_long)") + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "b_long AS b_long3", "true AS $f0"), + term("where", "IS NOT NULL(b_long)") + ), + term("groupBy", "b_long3"), + term("select", "b_long3", "MIN($f0) AS $f1") ), - term("groupBy", "b_long3"), - term("select", "b_long3", "MIN($f0) AS $f1") + term("select", "b_long3") ), term("where", "=(a_long, b_long3)"), - term("join", "a_long", "a_int", "a_string", "b_long3", "$f1"), + term("join", "a_long", "a_int", "a_string", "b_long3"), term("joinType", "InnerJoin") ), term("select", "a_int", "a_string") http://git-wip-us.apache.org/repos/asf/flink/blob/4024afff/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala index 25a81c4..cd6c77d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/WindowAggregateTest.scala @@ -40,18 +40,14 @@ class WindowAggregateTest extends TableTestBase { val expected = unaryNode( - "DataSetCalc", + "DataSetWindowAggregate", unaryNode( - "DataSetWindowAggregate", - unaryNode( - "DataSetCalc", - batchTableNode(0), - term("select", "ts, a, b") - ), - term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 7200000.millis)), - term("select", "SUM(a) AS sumA, COUNT(b) AS cntB") + "DataSetCalc", + batchTableNode(0), + term("select", "ts, a, b") ), - term("select", "sumA, cntB") + term("window", EventTimeTumblingGroupWindow(Some('w$), 'ts, 7200000.millis)), + term("select", "SUM(a) AS sumA, COUNT(b) AS cntB") ) util.verifySql(sqlQuery, expected) @@ -101,19 +97,15 @@ class WindowAggregateTest extends TableTestBase { val expected = unaryNode( - "DataSetCalc", + "DataSetWindowAggregate", unaryNode( - "DataSetWindowAggregate", - unaryNode( - "DataSetCalc", - batchTableNode(0), - term("select", "ts, a, b") - ), - term("window", - EventTimeSlidingGroupWindow(Some('w$), 'ts, 5400000.millis, 900000.millis)), - term("select", "SUM(a) AS sumA, COUNT(b) AS cntB") + "DataSetCalc", + batchTableNode(0), + term("select", "ts, a, b") ), - term("select", "sumA, cntB") + term("window", + EventTimeSlidingGroupWindow(Some('w$), 'ts, 5400000.millis, 900000.millis)), + term("select", "SUM(a) AS sumA, COUNT(b) AS cntB") ) util.verifySql(sqlQuery, expected) @@ -162,18 +154,14 @@ class WindowAggregateTest extends TableTestBase { val expected = unaryNode( - "DataSetCalc", + "DataSetWindowAggregate", unaryNode( - "DataSetWindowAggregate", - unaryNode( - "DataSetCalc", - batchTableNode(0), - term("select", "ts") - ), - term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 1800000.millis)), - term("select", "COUNT(*) AS cnt") + "DataSetCalc", + batchTableNode(0), + term("select", "ts") ), - term("select", "cnt") + term("window", EventTimeSessionGroupWindow(Some('w$), 'ts, 1800000.millis)), + term("select", "COUNT(*) AS cnt") ) util.verifySql(sqlQuery, expected)