Repository: flink Updated Branches: refs/heads/master cd1fbc078 -> 81dc260dc
[FLINK-7986] [table] Introduce FilterSetOpTransposeRule This closes #4956. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81dc260d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81dc260d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81dc260d Branch: refs/heads/master Commit: 81dc260dc653085b9dbf098e8fd70a72d2d0828e Parents: cd1fbc0 Author: Xpray <leonxp...@gmail.com> Authored: Mon Nov 6 23:47:33 2017 +0800 Committer: twalthr <twal...@apache.org> Committed: Thu Nov 16 14:43:50 2017 +0100 ---------------------------------------------------------------------- .../flink/table/plan/rules/FlinkRuleSets.scala | 2 + .../api/batch/table/SetOperatorsTest.scala | 80 ++++++++++++++++++++ .../api/stream/table/SetOperatorsTest.scala | 68 +++++++++++++++++ 3 files changed, 150 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index dcc735d..a20d14f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -52,6 +52,8 @@ object FlinkRuleSets { FilterJoinRule.JOIN, // push filter through an aggregation FilterAggregateTransposeRule.INSTANCE, + // push filter through set operation + FilterSetOpTransposeRule.INSTANCE, // aggregation and projection rules AggregateProjectMergeRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala index 2d4e205..35f4429 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/SetOperatorsTest.scala @@ -135,4 +135,84 @@ class SetOperatorsTest extends TableTestBase { util.verifyJavaTable(in, expected) } + + @Test + def testFilterUnionTranspose(): Unit = { + val util = batchTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.unionAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + + val expected = unaryNode( + "DataSetCalc", + unaryNode( + "DataSetAggregate", + binaryNode( + "DataSetUnion", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + term("union", "a", "b", "c") + ), + term("groupBy", "b"), + term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") + ) + + util.verifyTable(result, expected) + } + + @Test + def testFilterMinusTranspose(): Unit = { + val util = batchTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.minusAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + + val expected = unaryNode( + "DataSetCalc", + unaryNode( + "DataSetAggregate", + binaryNode( + "DataSetMinus", + unaryNode( + "DataSetCalc", + batchTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + unaryNode( + "DataSetCalc", + batchTableNode(1), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + term("minus", "a", "b", "c") + ), + term("groupBy", "b"), + term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") + ) + + util.verifyTable(result, expected) + } } http://git-wip-us.apache.org/repos/asf/flink/blob/81dc260d/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala new file mode 100644 index 0000000..b1b700b --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/SetOperatorsTest.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.table + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.utils.TableTestBase +import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode} +import org.junit.Test + +class SetOperatorsTest extends TableTestBase { + + @Test + def testFilterUnionTranspose(): Unit = { + val util = streamTestUtil() + val left = util.addTable[(Int, Long, String)]("left", 'a, 'b, 'c) + val right = util.addTable[(Int, Long, String)]("right", 'a, 'b, 'c) + + val result = left.unionAll(right) + .where('a > 0) + .groupBy('b) + .select('a.sum as 'a, 'b as 'b, 'c.count as 'c) + + val expected = unaryNode( + "DataStreamCalc", + unaryNode( + "DataStreamGroupAggregate", + binaryNode( + "DataStreamUnion", + unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + unaryNode( + "DataStreamCalc", + streamTableNode(1), + term("select", "a", "b", "c"), + term("where", ">(a, 0)") + ), + term("union all", "a", "b", "c") + ), + term("groupBy", "b"), + term("select", "b", "SUM(a) AS TMP_0", "COUNT(c) AS TMP_1") + ), + term("select", "TMP_0 AS a", "b", "TMP_1 AS c") + ) + + util.verifyTable(result, expected) + } +}