Repository: flink Updated Branches: refs/heads/master 338c30a41 -> 7a629fc59
[FLINK-5722] [table] Add dedicated DataSetDistinct operator. - Uses hash-combiner for more better combine rate. This closes #3471. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7a629fc5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7a629fc5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7a629fc5 Branch: refs/heads/master Commit: 7a629fc59ff206ba51f22e1bf35fe50882e63538 Parents: 338c30a Author: Fabian Hueske <fhue...@apache.org> Authored: Fri Mar 3 23:17:36 2017 +0100 Committer: twalthr <twal...@apache.org> Committed: Wed Mar 8 14:14:06 2017 +0100 ---------------------------------------------------------------------- .../plan/nodes/dataset/DataSetDistinct.scala | 94 ++++++++++++++++++++ .../flink/table/plan/rules/FlinkRuleSets.scala | 1 + .../rules/dataSet/DataSetAggregateRule.scala | 9 +- .../rules/dataSet/DataSetDistinctRule.scala | 61 +++++++++++++ .../runtime/aggregate/DistinctReduce.scala | 26 ++++++ .../scala/batch/sql/DistinctAggregateTest.scala | 45 ++++------ .../batch/sql/QueryDecorrelationTest.scala | 10 +-- .../api/scala/batch/sql/SetOperatorsTest.scala | 5 +- .../scala/batch/table/FieldProjectionTest.scala | 10 +-- 9 files changed, 218 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala new file mode 100644 index 0000000..14116f1 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetDistinct.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.nodes.dataset + +import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet} +import org.apache.calcite.rel.`type`.RelDataType +import org.apache.calcite.rel.metadata.RelMetadataQuery +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint +import org.apache.flink.api.java.DataSet +import org.apache.flink.table.api.BatchTableEnvironment +import org.apache.flink.table.runtime.aggregate.DistinctReduce +import org.apache.flink.types.Row + +import scala.collection.JavaConverters._ + +/** + * DataSet RelNode for a Distinct (LogicalAggregate without aggregation functions). + * + */ +class DataSetDistinct( + cluster: RelOptCluster, + traitSet: RelTraitSet, + input: RelNode, + rowRelDataType: RelDataType, + ruleDescription: String) + extends SingleRel(cluster, traitSet, input) + with DataSetRel { + + override def deriveRowType() = rowRelDataType + + override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = { + new DataSetDistinct( + cluster, + traitSet, + inputs.get(0), + rowRelDataType, + ruleDescription + ) + } + + override def computeSelfCost(planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = { + + val child = this.getInput + val rowCnt = metadata.getRowCount(child) + val rowSize = this.estimateRowSize(child.getRowType) + // less expensive than DataSetAggregate without aggregates + planner.getCostFactory.makeCost(rowCnt, 0, rowCnt * rowSize * 0.9) + } + + override def toString: String = { + s"Distinct(distinct: (${rowTypeToString(rowRelDataType)}))" + } + + override def explainTerms(pw: RelWriter): RelWriter = { + super.explainTerms(pw).item("distinct", rowTypeToString(rowRelDataType)) + } + + def rowTypeToString(rowType: RelDataType): String = { + rowType.getFieldList.asScala.map(_.getName).mkString(", ") + } + + override def translateToPlan(tableEnv: BatchTableEnvironment): DataSet[Row] = { + + val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv) + val groupKeys = (0 until rowRelDataType.getFieldCount).toArray // group on all fields + + inputDS + .groupBy(groupKeys: _*) + .reduce(new DistinctReduce) + .setCombineHint(CombineHint.HASH) // use hash-combiner + .name("distinct") + .returns(inputDS.getType) + } + +} + + http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala index 39e4353..3b20236 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala @@ -110,6 +110,7 @@ object FlinkRuleSets { DataSetWindowAggregateRule.INSTANCE, DataSetAggregateRule.INSTANCE, DataSetAggregateWithNullValuesRule.INSTANCE, + DataSetDistinctRule.INSTANCE, DataSetCalcRule.INSTANCE, DataSetJoinRule.INSTANCE, DataSetSingleRowJoinRule.INSTANCE, http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala index 9c0acdd..98d1c13 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala @@ -22,7 +22,6 @@ import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTrait import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule import org.apache.calcite.rel.logical.LogicalAggregate -import org.apache.flink.table.api.TableException import org.apache.flink.table.plan.nodes.dataset.{DataSetAggregate, DataSetConvention, DataSetUnion} import scala.collection.JavaConversions._ @@ -42,6 +41,14 @@ class DataSetAggregateRule return false } + // distinct is translated into dedicated operator + if (agg.getAggCallList.isEmpty && + agg.getGroupCount == agg.getRowType.getFieldCount && + agg.getRowType.equals(agg.getInput.getRowType) && + agg.getGroupSets.size() == 1) { + return false + } + // check if we have distinct aggregates val distinctAggs = agg.getAggCallList.exists(_.isDistinct) http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala new file mode 100644 index 0000000..644ff9b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetDistinctRule.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.rules.dataSet + +import org.apache.calcite.plan.{Convention, RelOptRule, RelOptRuleCall, RelTraitSet} +import org.apache.calcite.rel.RelNode +import org.apache.calcite.rel.convert.ConverterRule +import org.apache.calcite.rel.logical.LogicalAggregate +import org.apache.flink.table.plan.nodes.dataset.{DataSetConvention, DataSetDistinct} + +class DataSetDistinctRule + extends ConverterRule( + classOf[LogicalAggregate], + Convention.NONE, + DataSetConvention.INSTANCE, + "DataSetDistinctRule") + { + + override def matches(call: RelOptRuleCall): Boolean = { + val agg: LogicalAggregate = call.rel(0).asInstanceOf[LogicalAggregate] + + // only accept distinct + agg.getAggCallList.isEmpty && + agg.getGroupCount == agg.getRowType.getFieldCount && + agg.getRowType.equals(agg.getInput.getRowType) && + agg.getGroupSets.size() == 1 + } + + def convert(rel: RelNode): RelNode = { + val agg: LogicalAggregate = rel.asInstanceOf[LogicalAggregate] + val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) + val convInput: RelNode = RelOptRule.convert(agg.getInput, DataSetConvention.INSTANCE) + + new DataSetDistinct( + rel.getCluster, + traitSet, + convInput, + agg.getRowType, + description) + } + } + +object DataSetDistinctRule { + val INSTANCE: RelOptRule = new DataSetDistinctRule +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala new file mode 100644 index 0000000..f440573 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DistinctReduce.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.aggregate + +import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.types.Row + +class DistinctReduce extends ReduceFunction[Row] { + override def reduce(value1: Row, value2: Row): Row = value1 +} http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala index 38e4ea8..54b4d24 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/DistinctAggregateTest.scala @@ -40,14 +40,13 @@ class DistinctAggregateTest extends TableTestBase { unaryNode( "DataSetValues", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a") ), - term("groupBy", "a"), - term("select", "a") + term("distinct", "a") ), tuples(List(null)), term("values", "a") @@ -74,14 +73,13 @@ class DistinctAggregateTest extends TableTestBase { unaryNode( "DataSetValues", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a") ), - term("groupBy", "a"), - term("select", "a") + term("distinct", "a") ), tuples(List(null)), term("values", "a") @@ -174,14 +172,13 @@ class DistinctAggregateTest extends TableTestBase { unaryNode( "DataSetValues", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a") ), - term("groupBy", "a"), - term("select", "a") + term("distinct", "a") ), tuples(List(null)), term("values", "a") @@ -197,14 +194,13 @@ class DistinctAggregateTest extends TableTestBase { unaryNode( "DataSetValues", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "b") ), - term("groupBy", "b"), - term("select", "b") + term("distinct", "b") ), tuples(List(null)), term("values", "b") @@ -255,14 +251,13 @@ class DistinctAggregateTest extends TableTestBase { unaryNode( "DataSetValues", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a") ), - term("groupBy", "a"), - term("select", "a") + term("distinct", "a") ), tuples(List(null)), term("values", "a") @@ -282,14 +277,13 @@ class DistinctAggregateTest extends TableTestBase { unaryNode( "DataSetValues", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "b") ), - term("groupBy", "b"), - term("select", "b") + term("distinct", "b") ), tuples(List(null)), term("values", "b") @@ -384,14 +378,13 @@ class DistinctAggregateTest extends TableTestBase { unaryNode( "DataSetAggregate", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a", "b") ), - term("groupBy", "a, b"), - term("select", "a, b") + term("distinct", "a, b") ), term("groupBy", "a"), term("select", "a, SUM(b) AS EXPR$2, COUNT(b) AS EXPR$3") @@ -430,14 +423,13 @@ class DistinctAggregateTest extends TableTestBase { unaryNode( "DataSetAggregate", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a", "b") ), - term("groupBy", "a, b"), - term("select", "a, b") + term("distinct", "a, b") ), term("groupBy", "a"), term("select", "a, SUM(b) AS EXPR$2") @@ -451,14 +443,13 @@ class DistinctAggregateTest extends TableTestBase { unaryNode( "DataSetAggregate", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a", "c") ), - term("groupBy", "a, c"), - term("select", "a, c") + term("distinct", "a, c") ), term("groupBy", "a"), term("select", "a, COUNT(c) AS EXPR$3") http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala index 516fcd2..3e44526 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/QueryDecorrelationTest.scala @@ -55,7 +55,7 @@ class QueryDecorrelationTest extends TableTestBase { term("select", "empno", "salary") ), unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", binaryNode( @@ -78,8 +78,7 @@ class QueryDecorrelationTest extends TableTestBase { ), term("select", "empno") ), - term("groupBy", "empno"), - term("select", "empno") + term("distinct", "empno") ), term("where", "=(empno0, empno)"), term("join", "empno", "salary", "empno0"), @@ -145,7 +144,7 @@ class QueryDecorrelationTest extends TableTestBase { term("select", "salary", "deptno") ), unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", binaryNode( @@ -166,8 +165,7 @@ class QueryDecorrelationTest extends TableTestBase { ), term("select", "deptno0") ), - term("groupBy", "deptno0"), - term("select", "deptno0") + term("distinct", "deptno0") ), term("where", "=(deptno, deptno0)"), term("join", "salary", "deptno", "deptno0"), http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala index d70a32a..f902338 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/SetOperatorsTest.scala @@ -66,14 +66,13 @@ class SetOperatorsTest extends TableTestBase { term("select", "b_long") ), unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a_long") ), - term("groupBy", "a_long"), - term("select", "a_long") + term("distinct", "a_long") ), term("where", "=(a_long, b_long)"), term("join", "b_long", "a_long"), http://git-wip-us.apache.org/repos/asf/flink/blob/7a629fc5/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala index 0066ad2..a0412d5 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala @@ -117,14 +117,13 @@ class FieldProjectionTest extends TableTestBase { val expected = unaryNode( "DataSetCalc", unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a", "c") ), - term("groupBy", "a", "c"), - term("select", "a", "c") + term("distinct", "a", "c") ), term("select", "a") ) @@ -138,14 +137,13 @@ class FieldProjectionTest extends TableTestBase { val resultTable = sourceTable.groupBy('a, 'c).select('a, 'c) val expected = unaryNode( - "DataSetAggregate", + "DataSetDistinct", unaryNode( "DataSetCalc", batchTableNode(0), term("select", "a", "c") ), - term("groupBy", "a", "c"), - term("select", "a", "c") + term("distinct", "a", "c") ) util.verifyTable(resultTable, expected)