This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 0482983 [CARBONDATA-3291] Fix that MV datamap doesn't take affect when the same table join 0482983 is described below commit 04829839452d7f56954219b56a6e515239effe61 Author: qiuchenjian <807169...@qq.com> AuthorDate: Wed Feb 13 20:32:42 2019 +0800 [CARBONDATA-3291] Fix that MV datamap doesn't take affect when the same table join [Problem] MV datamap doesn't take affect when the same table join the error scene see the test case This closes #3125 --- .../carbondata/mv/rewrite/DefaultMatchMaker.scala | 15 +++- .../apache/carbondata/mv/rewrite/Navigator.scala | 51 +++++++++--- .../mv/rewrite/MVMultiJoinTestCase.scala | 94 ++++++++++++++++++++++ .../mv/plans/modular/ModularRelation.scala | 15 ++++ 4 files changed, 160 insertions(+), 15 deletions(-) diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala index cc5cc7b..59d72f8 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala @@ -162,8 +162,14 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper // are 1-1 correspondence. // Change the following two conditions to more complicated ones if we want to // consider things that combine extrajoin, rejoin, and harmonized relations - val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count(_ == x) != 1 } - val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count(_ == x) != 1 } + val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count{ + case relation: ModularRelation => relation.fineEquals(x) + case other => other == x + } != 1 } + val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count{ + case relation: ModularRelation => relation.fineEquals(x) + case other => other == x + } != 1 } val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) } val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) } @@ -180,7 +186,10 @@ object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper isPredicateEmdR && isOutputEdR) { val mappings = sel_1a.children.zipWithIndex.map { case (childr, fromIdx) if sel_1q.children.contains(childr) => - val toIndx = sel_1q.children.indexWhere(_ == childr) + val toIndx = sel_1q.children.indexWhere{ + case relation: ModularRelation => relation.fineEquals(childr) + case other => other == childr + } (toIndx -> fromIdx) } diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala index 76df4c2..905cd17 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala @@ -17,11 +17,11 @@ package org.apache.carbondata.mv.rewrite -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSet} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap} import org.apache.carbondata.mv.expressions.modular._ -import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select} import org.apache.carbondata.mv.plans.modular +import org.apache.carbondata.mv.plans.modular._ import org.apache.carbondata.mv.session.MVSession private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession) { @@ -146,21 +146,27 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession) val rtables = subsumer.collect { case n: modular.LeafNode => n } val etables = subsumee.collect { case n: modular.LeafNode => n } val pairs = for { - rtable <- rtables - etable <- etables - if rtable == etable - } yield (rtable, etable) + i <- rtables.indices + j <- etables.indices + if rtables(i) == etables(j) && reTablesJoinMatched( + rtables(i), etables(j), subsumer, subsumee, i, j + ) + } yield (rtables(i), etables(j)) pairs.foldLeft(subsumer) { case (curSubsumer, pair) => val mappedOperator = - if (pair._1.isInstanceOf[modular.HarmonizedRelation] && - pair._1.asInstanceOf[modular.HarmonizedRelation].hasTag) { - pair._2.asInstanceOf[modular.HarmonizedRelation].addTag - } else { - pair._2 + pair._1 match { + case relation: HarmonizedRelation if relation.hasTag => + pair._2.asInstanceOf[HarmonizedRelation].addTag + case _ => + pair._2 + } + val nxtSubsumer = curSubsumer.transform { + case node: ModularRelation if node.fineEquals(pair._1) => mappedOperator + case pair._1 if !pair._1.isInstanceOf[ModularRelation] => mappedOperator } - val nxtSubsumer = curSubsumer.transform { case pair._1 => mappedOperator } + // val attributeSet = AttributeSet(pair._1.output) // reverse first due to possible tag for left join val rewrites = AttributeMap(pair._1.output.zip(mappedOperator.output)) @@ -171,4 +177,25 @@ private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVSession) } } } + + // match the join table of subsumer and subsumee + // when the table names are the same + def reTablesJoinMatched(rtable: modular.LeafNode, etable: modular.LeafNode, + subsumer: ModularPlan, subsumee: ModularPlan, + rIndex: Int, eIndex: Int): Boolean = { + (rtable, etable) match { + case _: (ModularRelation, ModularRelation) => + val rtableParent = subsumer.find(p => p.children.contains(rtable)).get + val etableParent = subsumee.find(p => p.children.contains(etable)).get + (rtableParent, etableParent) match { + case (e: Select, r: Select) => + val intersetJoinEdges = r.joinEdges intersect e.joinEdges + if (intersetJoinEdges.nonEmpty) { + return intersetJoinEdges.exists(j => j.left == rIndex && j.left == eIndex || + j.right == rIndex && j.right == eIndex) + } + } + } + true + } } diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala new file mode 100644 index 0000000..bfd621d --- /dev/null +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVMultiJoinTestCase.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.mv.rewrite + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +class MVMultiJoinTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll(){ + drop + sql("create table dim_table(name string,age int,height int) using carbondata") + sql("create table sdr_table(name varchar(20), score int) using carbondata") + sql("create table areas(aid int, title string, pid int) using carbondata") + } + + override def afterAll(){ + drop + } + + test("test mv self join") { + sql("insert into areas select 130000, 'hebei', null") + sql("insert into areas select 130100, 'shijiazhuang', 130000") + sql("insert into areas select 130400, 'handan', 130000") + sql("insert into areas select 410000, 'henan', null") + sql("insert into areas select 410300, 'luoyang', 410000") + + val mvSQL = + s"""select p.title,c.title + |from areas as p + |inner join areas as c on c.pid=p.aid + |where p.title = 'hebei' + """.stripMargin + sql("create datamap table_mv using 'mv' as " + mvSQL) + sql("rebuild datamap table_mv") + val frame = sql(mvSQL) + assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv")) + checkAnswer(frame, Seq(Row("hebei","shijiazhuang"), Row("hebei","handan"))) + } + + test("test mv two join tables are same") { + sql("drop datamap if exists table_mv") + + sql("insert into dim_table select 'tom',20,170") + sql("insert into dim_table select 'lily',30,160") + sql("insert into sdr_table select 'tom',70") + sql("insert into sdr_table select 'tom',50") + sql("insert into sdr_table select 'lily',80") + + val mvSQL = + s"""select sdr.name,sum(sdr.score),dim.age,dim_other.height from sdr_table sdr + | left join dim_table dim on sdr.name = dim.name + | left join dim_table dim_other on sdr.name = dim_other.name + | group by sdr.name,dim.age,dim_other.height + """.stripMargin + sql("create datamap table_mv using 'mv' as " + mvSQL) + sql("rebuild datamap table_mv") + val frame = sql(mvSQL) + assert(verifyMVDataMap(frame.queryExecution.analyzed, "table_mv")) + checkAnswer(frame, Seq(Row("lily",80,30,160),Row("tom",120,20,170))) + } + + def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { + val tables = logicalPlan collect { + case l: LogicalRelation => l.catalogTable.get + } + tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table")) + } + + def drop: Unit ={ + sql("drop table if exists areas") + sql("drop table if exists dim_table") + sql("drop table if exists sdr_table") + sql("drop datamap if exists table_mv") + } + +} diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala index 7e1eb05..491d394 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala @@ -61,6 +61,21 @@ case class ModularRelation(databaseName: String, override def adjacencyList: Map[Int, Seq[(Int, JoinType)]] = Map.empty + def fineEquals(that: Any): Boolean = { + that match { + case that: ModularRelation => + if ((databaseName != null && tableName != null && databaseName == that.databaseName && + tableName == that.tableName && output.toString == that.output.toString) || + (databaseName == null && tableName == null && that.databaseName == null && + that.tableName == null && output.toString == that.output.toString)) { + true + } else { + false + } + case _ => false + } + } + override def equals(that: Any): Boolean = { that match { case that: ModularRelation =>