http://git-wip-us.apache.org/repos/asf/flink/blob/c5173fa2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala index 3e44526..7496ff8 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala @@ -40,56 +40,6 @@ class QueryDecorrelationTest extends TableTestBase { "and e1.deptno < 10 and d1.deptno < 15\n" + "and e1.salary > (select avg(salary) from emp e2 where e1.empno = e2.empno)" - // the inner query "select avg(salary) from emp e2 where e1.empno = e2.empno" will be - // decorrelated into a join and then groupby. And the filters - // "e1.deptno < 10 and d1.deptno < 15" will also be pushed down before join. - val decorrelatedSubQuery = unaryNode( - "DataSetAggregate", - unaryNode( - "DataSetCalc", - binaryNode( - "DataSetJoin", - unaryNode( - "DataSetCalc", - batchTableNode(0), - term("select", "empno", "salary") - ), - unaryNode( - "DataSetDistinct", - unaryNode( - "DataSetCalc", - binaryNode( - "DataSetJoin", - unaryNode( - "DataSetCalc", - batchTableNode(0), - term("select", "empno", "deptno"), - term("where", "<(deptno, 10)") - ), - unaryNode( - "DataSetCalc", - batchTableNode(1), - term("select", "deptno"), - term("where", "<(deptno, 15)") - ), - term("where", "=(deptno, deptno0)"), - term("join", "empno", "deptno", "deptno0"), - term("joinType", "InnerJoin") - ), - term("select", "empno") - ), - term("distinct", "empno") - ), - term("where", "=(empno0, empno)"), - term("join", "empno", "salary", "empno0"), - term("joinType", "InnerJoin") - ), - term("select", "empno0", "salary") - ), - term("groupBy", "empno0"), - term("select", "empno0", "AVG(salary) AS EXPR$0") - ) - val expectedQuery = unaryNode( "DataSetCalc", binaryNode( @@ -112,7 +62,17 @@ class QueryDecorrelationTest extends TableTestBase { term("join", "empno", "ename", "job", "salary", "deptno", "deptno0", "name"), term("joinType", "InnerJoin") ), - decorrelatedSubQuery, + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "salary", "empno"), + term("where", "IS NOT NULL(empno)") + ), + term("groupBy", "empno"), + term("select", "empno", "AVG(salary) AS EXPR$0") + ), term("where", "AND(=(empno, empno0), >(salary, EXPR$0))"), term("join", "empno", "ename", "job", "salary", "deptno", "deptno0", "name", "empno0", "EXPR$0"), @@ -132,51 +92,6 @@ class QueryDecorrelationTest extends TableTestBase { " select avg(e2.salary) from emp e2 where e2.deptno = d1.deptno" + ")" - val decorrelatedSubQuery = unaryNode( - "DataSetAggregate", - unaryNode( - "DataSetCalc", - binaryNode( - "DataSetJoin", - unaryNode( - "DataSetCalc", - batchTableNode(0), - term("select", "salary", "deptno") - ), - unaryNode( - "DataSetDistinct", - unaryNode( - "DataSetCalc", - binaryNode( - "DataSetJoin", - unaryNode( - "DataSetCalc", - batchTableNode(0), - term("select", "deptno") - ), - unaryNode( - "DataSetCalc", - batchTableNode(1), - term("select", "deptno") - ), - term("where", "=(deptno, deptno0)"), - term("join", "deptno", "deptno0"), - term("joinType", "InnerJoin") - ), - term("select", "deptno0") - ), - term("distinct", "deptno0") - ), - term("where", "=(deptno, deptno0)"), - term("join", "salary", "deptno", "deptno0"), - term("joinType", "InnerJoin") - ), - term("select", "deptno0", "salary") - ), - term("groupBy", "deptno0"), - term("select", "deptno0", "AVG(salary) AS EXPR$0") - ) - val expectedQuery = unaryNode( "DataSetAggregate", binaryNode( @@ -198,10 +113,20 @@ class QueryDecorrelationTest extends TableTestBase { term("join", "empno", "ename", "job", "salary", "deptno", "deptno0", "name"), term("joinType", "InnerJoin") ), - decorrelatedSubQuery, - term("where", "AND(=(deptno0, deptno00), >(salary, EXPR$0))"), + unaryNode( + "DataSetAggregate", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "deptno", "salary"), + term("where", "IS NOT NULL(deptno)") + ), + term("groupBy", "deptno"), + term("select", "deptno", "AVG(salary) AS EXPR$0") + ), + term("where", "AND(=(deptno0, deptno1), >(salary, EXPR$0))"), term("join", "empno", "ename", "job", "salary", "deptno", "deptno0", - "name", "deptno00", "EXPR$0"), + "name", "deptno1", "EXPR$0"), term("joinType", "InnerJoin") ), term("select", "empno")
http://git-wip-us.apache.org/repos/asf/flink/blob/c5173fa2/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala index f902338..2f9057d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala @@ -58,33 +58,15 @@ class SetOperatorsTest extends TableTestBase { "DataSetAggregate", unaryNode( "DataSetCalc", - binaryNode( - "DataSetJoin", - unaryNode( - "DataSetCalc", - batchTableNode(1), - term("select", "b_long") - ), - unaryNode( - "DataSetDistinct", - unaryNode( - "DataSetCalc", - batchTableNode(0), - term("select", "a_long") - ), - term("distinct", "a_long") - ), - term("where", "=(a_long, b_long)"), - term("join", "b_long", "a_long"), - term("joinType", "InnerJoin") - ), - term("select", "a_long", "true AS $f0") + batchTableNode(1), + term("select", "b_long AS b_long3", "true AS $f0"), + term("where", "IS NOT NULL(b_long)") ), - term("groupBy", "a_long"), - term("select", "a_long", "MIN($f0) AS $f1") + term("groupBy", "b_long3"), + term("select", "b_long3", "MIN($f0) AS $f1") ), - term("where", "=(a_long, a_long0)"), - term("join", "a_long", "a_int", "a_string", "a_long0", "$f1"), + term("where", "=(a_long, b_long3)"), + term("join", "a_long", "a_int", "a_string", "b_long3", "$f1"), term("joinType", "InnerJoin") ), term("select", "a_int", "a_string")