http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala new file mode 100644 index 0000000..074d369 --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.rewrite + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Expression, PredicateHelper} +import org.apache.spark.sql.catalyst.expressions.aggregate._ + +import org.apache.carbondata.mv.plans.modular +import org.apache.carbondata.mv.plans.modular.ModularPlan + +/** + * Utility functions used by mqo matcher to convert our plan to new aggregation code path + */ +private[rewrite] object Utils extends PredicateHelper { + + // use for match qb_2a, qb_2q and sel_3a, sel_3q + private def doMatch( + operator_a: modular.Matchable, + operator_q: modular.Matchable, + alias_m: AttributeMap[Alias]): Option[modular.Matchable] = { + var matchable = true + val matched = operator_q.transformExpressions { + case cnt_q@AggregateExpression(Count(exprs_q), _, false, _) => + operator_a.outputList.find { + case alias: Alias if alias_m.contains(alias.toAttribute) && + alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] && + alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression] + .aggregateFunction.isInstanceOf[Count] => + // case for groupby + val cnt_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression] + val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children + if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) { + false + } else { + exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode())) + .forall(p => p._1.semanticEquals(p._2)) + } + + case attr: Attribute if alias_m.contains(attr) && + alias_m(attr).child.isInstanceOf[AggregateExpression] && + alias_m(attr).child.asInstanceOf[AggregateExpression] + .aggregateFunction.isInstanceOf[Count] => + val cnt_a = alias_m(attr).child.asInstanceOf[AggregateExpression] + val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children + if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) { + false + } else { + exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode())) + .forall(p => p._1.semanticEquals(p._2)) + } + + case _ => false + }.map { cnt => AggregateExpression( + Sum(cnt.toAttribute), + cnt_q.mode, + isDistinct = false, + cnt_q.resultId) + }.getOrElse { matchable = false; cnt_q } + + case sum_q@AggregateExpression(Sum(expr_q), _, false, _) => + operator_a.outputList.find { + case alias: Alias if alias_m.contains(alias.toAttribute) && + alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] && + alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression] + .aggregateFunction.isInstanceOf[Sum] => + val sum_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression] + val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child + if (sum_a.isDistinct != sum_q.isDistinct) { + false + } else { + expr_a.semanticEquals(expr_q) + } + + case attr: Attribute if alias_m.contains(attr) && + alias_m(attr).child.isInstanceOf[AggregateExpression] && + alias_m(attr).child.asInstanceOf[AggregateExpression] + .aggregateFunction.isInstanceOf[Sum] => + val sum_a = alias_m(attr).child.asInstanceOf[AggregateExpression] + val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child + if (sum_a.isDistinct != sum_q.isDistinct) { + false + } else { + expr_a.semanticEquals(expr_q) + } + + case _ => false + }.map { sum => AggregateExpression( + Sum(sum.toAttribute), + sum_q.mode, + isDistinct = false, + sum_q.resultId) + }.getOrElse { matchable = false; sum_q } + + case max_q@AggregateExpression(Max(expr_q), _, false, _) => + operator_a.outputList.find { + case alias: Alias if alias_m.contains(alias.toAttribute) && + alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] && + alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression] + .aggregateFunction.isInstanceOf[Max] => + val max_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression] + val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child + if (max_a.isDistinct != max_q.isDistinct) { + false + } else { + expr_a.semanticEquals(expr_q) + } + + case attr: Attribute if alias_m.contains(attr) && + alias_m(attr).child.isInstanceOf[AggregateExpression] && + alias_m(attr).child.asInstanceOf[AggregateExpression] + .aggregateFunction.isInstanceOf[Max] => + val max_a = alias_m(attr).child.asInstanceOf[AggregateExpression] + val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child + if (max_a.isDistinct != max_q.isDistinct) { + false + } else { + expr_a.semanticEquals(expr_q) + } + + case _ => false + }.map { max => AggregateExpression( + Max(max.toAttribute), + max_q.mode, + isDistinct = false, + max_q.resultId) + }.getOrElse { matchable = false; max_q } + + case min_q@AggregateExpression(Min(expr_q), _, false, _) => + operator_a.outputList.find { + case alias: Alias if alias_m.contains(alias.toAttribute) && + alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] && + alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression] + .aggregateFunction.isInstanceOf[Min] => { + val min_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression] + val expr_a = min_a.aggregateFunction.asInstanceOf[Max].child + if (min_a.isDistinct != min_q.isDistinct) { + false + } else { + expr_a.semanticEquals(expr_q) + } + } + case attr: Attribute if alias_m.contains(attr) && + alias_m(attr).child.isInstanceOf[AggregateExpression] && + alias_m(attr).child.asInstanceOf[AggregateExpression] + .aggregateFunction.isInstanceOf[Min] => { + val min_a = alias_m(attr).child.asInstanceOf[AggregateExpression] + val expr_a = min_a.aggregateFunction.asInstanceOf[Max].child + if (min_a.isDistinct != min_q.isDistinct) { + false + } else { + expr_a.semanticEquals(expr_q) + } + } + case _ => false + }.map { min => AggregateExpression( + Min(min.toAttribute), + min_q.mode, + isDistinct = false, + min_q.resultId) + }.getOrElse { matchable = false; min_q } + + case other: AggregateExpression => + matchable = false + other + + case expr: Expression if !expr.isInstanceOf[AggregateFunction] => + operator_a.outputList.find { + case alias: Alias if alias_m.contains(alias.toAttribute) && + alias_m(alias.toAttribute).child.semanticEquals(expr) && + !alias_m(alias.toAttribute).child + .isInstanceOf[AggregateExpression] => true + case attr: Attribute if alias_m.contains(attr) && + alias_m(attr).child.semanticEquals(expr) && + !alias_m(attr).child.isInstanceOf[AggregateExpression] => true + case _ => false + }.map(_.toAttribute) + .getOrElse { expr } + } + + if (matchable) { + Some(matched) + } else { + None + } + } + + def tryMatch(a: modular.Matchable, + q: modular.Matchable, + m: AttributeMap[Alias]): Option[modular.Matchable] = { + if (a.getClass == q.getClass) { + doMatch(a, q, m) + } else { + None + } + } + + /** + * (Subsumee) expression translation: + * + * The translation begins by creating a copy of the whole expression (step 1). Then each input + * column is translated in turn. + * To translate an input column, we first find the child block that produces the input column + * and replace the input column with the + * associated output column expression (step 2). The next step is to replace the translated + * expression with its equivalent output + * expression at the top of the child compensation (step 3). Then, We recursively translate + * each new input column(except input + * columns produced by rejoin children) until we reach the bottom of the child compensation + * (step 4). Finally, we find an + * equivalent output expression in subsumer (step 5). + * + * So given a subsumee expr, the translation follows the following path: + * + * top of subsumee --> child of subsumee --> top of compensation --> bottom of compensation --> + * top of subsumer + * + * To simplify this we assume in subsumer outputList of top select 1-1 corresponds to the + * outputList of groupby + * note that subsumer outputList is list of attributes and that of groupby is list of aliases + * + */ + private def doTopSelectTranslation(exprE: Expression, + exprListR: Seq[Expression], + subsumee: ModularPlan, + subsumer: ModularPlan, + compensation: Option[ModularPlan]): Option[Expression] = { + (subsumer, subsumee, compensation) match { + // top selects whose children do not match exactly + // for simplicity, we assume outputList of subsumer is 1-1 corresponding to that of its + // immediately groupby child + case ( + sel_3a@modular.Select( + _, _, _, _, _, + Seq(gb_2a@modular.GroupBy( + _, _, _, _, sel_2a@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), + _, _, _, _), + sel_3q@modular.Select( + _, _, _, _, _, Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _), + Some(gb_2c@modular.GroupBy( + _, _, _, _, sel_2c@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)) + ) => + if (sel_3q.predicateList.contains(exprE)) { + val expr1E = exprE.transform { + case attr: Attribute => + gb_2c.outputList.lift( + gb_2q.outputList.indexWhere { + case alias: Alias if alias.toAttribute.semanticEquals(attr) => true; + case other => false + }).getOrElse { attr } + } + if (expr1E.eq(exprE)) { + None + } else { + Some(expr1E) + } + } + else if (sel_3q.outputList.contains(exprE)) { + exprE match { + case attr: Attribute => // this subexpression must in subsumee select output list + gb_2c.outputList.lift( + gb_2q.outputList.indexWhere { + case a if a.toAttribute.semanticEquals(attr) => true; + case other => false + }) + + case alias: Alias => + gb_2c.outputList.lift( + gb_2q.outputList.indexWhere { + case a if a.toAttribute.semanticEquals(alias.toAttribute) => true; + case other => false + }) + + case _ => None + } + } else if (sel_2c.predicateList.contains(exprE)) { + if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) || + canEvaluate(exprE, subsumer)) { + Some(exprE) + } else { + None + } + } else if (gb_2c.predicateList.contains(exprE)) { + if (gb_2a.outputList.exists { + case a: Alias if a.child.semanticEquals(exprE) => true; + case _ => false + } || canEvaluate(exprE, subsumer)) { + Some(exprE) + } else { + None + } + } else if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) || + canEvaluate(exprE, subsumer)) { + Some(exprE) + } else { + None + } + + case _ => None // TODO: implement this + } + } + + private def isSemanticEquivalent(translatedExpr: Expression, subsumer: ModularPlan) = { + subsumer match { + // if subsumer has where clause, even if expr can be translated into new expr based on + // subsumer, the two may not be semantic equivalent + // TODO: refine this + case modular.Select( + _, _, predicateList, _, _, + Seq(modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _) + if predicateList.nonEmpty => false + case _ => true + } + } + + /** + * derivable = translatable + semantic equivalent + * + * The translation method described above is also the first step in deriving a subsumee + * expression Eexp from the subsumer's output columns. After translating + * Eexp to E'exp, deriavability can be established by making sure that the subsumer + * computes at its output certain necessary subexpressions of E'exp (or even the entire + * E'exp). The problem that arises, however, is to determine the parts of E'exp that + * can/should be computed by the subsumer. + * + * In general, translation causes an expression to expand by replacing individual input + * columns with equivalent subexpressions. Derivation is the reverse operation, where + * pieces of the translated expression are collapsed as they are computed along the + * derivation path. + */ + + def isDerivable(exprE: Expression, + exprListR: Seq[Expression], + subsumee: ModularPlan, + subsumer: ModularPlan, + compensation: Option[ModularPlan]): Boolean = { + val exprE1 = doTopSelectTranslation(exprE, exprListR, subsumee, subsumer, compensation) + exprE1 match { + case Some(e) => isSemanticEquivalent(e, subsumer) + case _ => false + } + } + +}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala new file mode 100644 index 0000000..184fdc1 --- /dev/null +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala @@ -0,0 +1,676 @@ +package org.apache.carbondata.mv.rewrite + +import java.io.File + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class MVCreateTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + drop() + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + val projectPath = new File(this.getClass.getResource("/").getPath + "../../../../../") + .getCanonicalPath.replaceAll("\\\\", "/") + val integrationPath = s"$projectPath/integration" + val resourcesPath = s"$integrationPath/spark-common-test/src/test/resources" + sql( + """ + | CREATE TABLE fact_table1 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql( + """ + | CREATE TABLE fact_table2 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + sql( + """ + | CREATE TABLE fact_table3 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table3 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table3 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + sql( + """ + | CREATE TABLE fact_table4 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table4 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table4 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql( + """ + | CREATE TABLE fact_table5 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table5 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + sql( + """ + | CREATE TABLE fact_table6 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table6 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table6 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + } + + test("test create datamap with simple and same projection") { + sql("drop datamap if exists datamap1") + sql("create datamap datamap1 using 'mv' as select empname, designation from fact_table1") + sql(s"rebuild datamap datamap1") + val df = sql("select empname,designation from fact_table1") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap1")) + checkAnswer(df, sql("select empname,designation from fact_table2")) + sql(s"drop datamap datamap1") + } + + test("test create datamap with simple and sub projection") { + sql("drop datamap if exists datamap2") + sql("create datamap datamap2 using 'mv' as select empname, designation from fact_table1") + sql(s"rebuild datamap datamap2") + val df = sql("select empname from fact_table1") + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap2")) + checkAnswer(df, sql("select empname from fact_table2")) + sql(s"drop datamap datamap2") + } + + test("test create datamap with simple and same projection with projection filter") { + sql("drop datamap if exists datamap3") + sql("create datamap datamap3 using 'mv' as select empname, designation from fact_table1") + sql(s"rebuild datamap datamap3") + val frame = sql("select empname, designation from fact_table1 where empname='shivani'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap3")) + + checkAnswer(frame, sql("select empname, designation from fact_table2 where empname='shivani'")) + sql(s"drop datamap datamap3") + } + + test("test create datamap with simple and sub projection with non projection filter") { + sql("create datamap datamap4 using 'mv' as select empname, designation from fact_table1") + sql(s"rebuild datamap datamap4") + val frame = sql("select designation from fact_table1 where empname='shivani'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap4")) + checkAnswer(frame, sql("select designation from fact_table2 where empname='shivani'")) + sql(s"drop datamap datamap4") + } + + test("test create datamap with simple and sub projection with datamap filter") { + sql("create datamap datamap5 using 'mv' as select empname, designation from fact_table1 where empname='shivani'") + sql(s"rebuild datamap datamap5") + val frame = sql("select designation from fact_table1 where empname='shivani'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap5")) + checkAnswer(frame, sql("select designation from fact_table2 where empname='shivani'")) + sql(s"drop datamap datamap5") + } + + test("test create datamap with simple and same projection with datamap filter ") { + sql("create datamap datamap6 using 'mv' as select empname, designation from fact_table1 where empname='shivani'") + sql(s"rebuild datamap datamap6") + val frame = sql("select empname,designation from fact_table1 where empname='shivani'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap6")) + checkAnswer(frame, sql("select empname,designation from fact_table2 where empname='shivani'")) + sql(s"drop datamap datamap6") + } + + test("test create datamap with simple and same projection with datamap filter and extra query column filter") { + sql("create datamap datamap7 using 'mv' as select empname, designation from fact_table1 where empname='shivani'") + sql(s"rebuild datamap datamap7") + val frame = sql( + "select empname,designation from fact_table1 where empname='shivani' and designation='SA'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap7")) + checkAnswer(frame, sql("select empname,designation from fact_table2 where empname='shivani' and designation='SA'")) + sql(s"drop datamap datamap7") + } + + test("test create datamap with simple and same projection with datamap filter and different column filter") { + sql("create datamap datamap8 using 'mv' as select empname, designation from fact_table1 where empname='shivani'") + sql(s"rebuild datamap datamap8") + val frame = sql("select empname,designation from fact_table1 where designation='SA'") + val analyzed = frame.queryExecution.analyzed + assert(!verifyMVDataMap(analyzed, "datamap8")) + checkAnswer(frame, sql("select empname,designation from fact_table2 where designation='SA'")) + sql(s"drop datamap datamap8") + } + + test("test create datamap with simple and same projection with datamap filter on non projection column and extra column filter") { + sql("create datamap datamap9 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'") + sql(s"rebuild datamap datamap9") + val frame = sql("select empname,designation from fact_table1 where deptname='cloud'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap9")) + checkAnswer(frame, sql("select empname,designation from fact_table2 where deptname='cloud'")) + sql(s"drop datamap datamap9") + } + + test("test create datamap with simple and same projection with datamap filter on non projection column and no column filter") { + sql("create datamap datamap10 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'") + sql(s"rebuild datamap datamap10") + val frame = sql("select empname,designation from fact_table1") + val analyzed = frame.queryExecution.analyzed + assert(!verifyMVDataMap(analyzed, "datamap10")) + checkAnswer(frame, sql("select empname,designation from fact_table2")) + sql(s"drop datamap datamap10") + } + + test("test create datamap with simple and same projection with datamap filter on non projection column and different column filter") { + sql("create datamap datamap11 using 'mv' as select empname, designation from fact_table1 where deptname='cloud'") + sql(s"rebuild datamap datamap11") + val frame = sql("select empname,designation from fact_table1 where designation='SA'") + val analyzed = frame.queryExecution.analyzed + assert(!verifyMVDataMap(analyzed, "datamap11")) + checkAnswer(frame, sql("select empname,designation from fact_table2 where designation='SA'")) + sql(s"drop datamap datamap11") + } + + test("test create datamap with simple and same group by query") { + sql("drop datamap if exists datamap12") + sql("create datamap datamap12 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname") + sql(s"rebuild datamap datamap12") + val frame = sql("select empname, sum(utilization) from fact_table1 group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap12")) + checkAnswer(frame, sql("select empname, sum(utilization) from fact_table2 group by empname")) + sql(s"drop datamap datamap12") + } + + test("test create datamap with simple and sub group by query") { + sql("drop datamap if exists datamap13") + sql("create datamap datamap13 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname") + sql(s"rebuild datamap datamap13") + val frame = sql("select sum(utilization) from fact_table1 group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap13")) + checkAnswer(frame, sql("select sum(utilization) from fact_table2 group by empname")) + sql(s"drop datamap datamap13") + } + + test("test create datamap with simple and sub group by query with filter on query") { + sql("drop datamap if exists datamap14") + sql("create datamap datamap14 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname") + sql(s"rebuild datamap datamap14") + val frame = sql( + "select empname,sum(utilization) from fact_table1 group by empname having empname='shivani'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap14")) + checkAnswer(frame, sql("select empname,sum(utilization) from fact_table2 where empname='shivani' group by empname")) + sql(s"drop datamap datamap14") + } + + test("test create datamap with simple and sub group and sub projection by query with filter on query") { + sql("drop datamap if exists datamap32") + sql("create datamap datamap32 using 'mv' as select empname, sum(utilization) from fact_table1 group by empname") + sql(s"rebuild datamap datamap32") + val frame = sql( + "select empname, sum(utilization) from fact_table1 group by empname having empname='shivani'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap32")) + checkAnswer(frame, sql( "select empname, sum(utilization) from fact_table2 group by empname having empname='shivani'")) + sql(s"drop datamap datamap32") + } + + test("test create datamap with simple and sub group by query with filter on datamap") { + sql("create datamap datamap15 using 'mv' as select empname, sum(utilization) from fact_table1 where empname='shivani' group by empname") + sql(s"rebuild datamap datamap15") + val frame = sql( + "select empname,sum(utilization) from fact_table1 where empname='shivani' group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap15")) + checkAnswer(frame, sql("select empname,sum(utilization) from fact_table2 where empname='shivani' group by empname")) + sql(s"drop datamap datamap15") + } + + test("test create datamap with simple and sub group by query with filter on datamap and no filter on query") { + sql("create datamap datamap16 using 'mv' as select empname, sum(utilization) from fact_table1 where empname='shivani' group by empname") + sql(s"rebuild datamap datamap16") + val frame = sql("select empname,sum(utilization) from fact_table1 group by empname") + val analyzed = frame.queryExecution.analyzed + assert(!verifyMVDataMap(analyzed, "datamap16")) + checkAnswer(frame, sql("select empname,sum(utilization) from fact_table2 group by empname")) + sql(s"drop datamap datamap16") + } + + test("test create datamap with simple and same group by with expression") { + sql("create datamap datamap17 using 'mv' as select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname") + sql(s"rebuild datamap datamap17") + val frame = sql( + "select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group" + + " by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap17")) + checkAnswer(frame, sql("select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table2 group" + + " by empname")) + sql(s"drop datamap datamap17") + } + + test("test create datamap with simple and sub group by with expression") { + sql("drop datamap if exists datamap18") + sql("create datamap datamap18 using 'mv' as select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname") + sql(s"rebuild datamap datamap18") + val frame = sql( + "select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap18")) + checkAnswer(frame, sql("select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table2 group by empname")) + sql(s"drop datamap datamap18") + } + + test("test create datamap with simple and sub count group by with expression") { + sql("drop datamap if exists datamap19") + sql("create datamap datamap19 using 'mv' as select empname, count(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname") + sql(s"rebuild datamap datamap19") + val frame = sql( + "select count(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap19")) + checkAnswer(frame, sql("select count(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table2 group by empname")) + sql(s"drop datamap datamap19") + } + + test("test create datamap with simple and sub group by with expression and filter on query") { + sql("drop datamap if exists datamap20") + sql("create datamap datamap20 using 'mv' as select empname, sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 group by empname") + sql(s"rebuild datamap datamap20") + val frame = sql( + "select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table1 where " + + "empname='shivani' group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap20")) + checkAnswer(frame, sql("select sum(CASE WHEN utilization=27 THEN deptno ELSE 0 END) from fact_table2 where " + + "empname='shivani' group by empname")) + sql(s"drop datamap datamap20") + } + + test("test create datamap with simple join") { + sql("drop datamap if exists datamap21") + sql("create datamap datamap21 using 'mv' as select t1.empname as c1, t2.designation, t2.empname as c2 from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname") + sql(s"rebuild datamap datamap21") + val frame = sql( + "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap21")) + checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 t1,fact_table5 t2 where t1.empname = t2.empname")) + sql(s"drop datamap datamap21") + } + + test("test create datamap with simple join and filter on query") { + sql("drop datamap if exists datamap22") + sql("create datamap datamap22 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname") + sql(s"rebuild datamap datamap22") + val frame = sql( + "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " + + "t2.empname and t1.empname='shivani'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap22")) + checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 t1,fact_table5 t2 where t1.empname = " + + "t2.empname and t1.empname='shivani'")) + sql(s"drop datamap datamap22") + } + + + test("test create datamap with simple join and filter on query and datamap") { + sql("drop datamap if exists datamap23") + sql("create datamap datamap23 using 'mv' as select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname and t1.empname='shivani'") + sql(s"rebuild datamap datamap23") + val frame = sql( + "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = " + + "t2.empname and t1.empname='shivani'") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap23")) + checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 t1,fact_table5 t2 where t1.empname = " + + "t2.empname and t1.empname='shivani'")) + sql(s"drop datamap datamap23") + } + + test("test create datamap with simple join and filter on datamap and no filter on query") { + sql("drop datamap if exists datamap24") + sql("create datamap datamap24 using 'mv' as select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname and t1.empname='shivani'") + sql(s"rebuild datamap datamap24") + val frame = sql( + "select t1.empname, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname") + val analyzed = frame.queryExecution.analyzed + assert(!verifyMVDataMap(analyzed, "datamap24")) + checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 t1,fact_table5 t2 where t1.empname = t2.empname")) + sql(s"drop datamap datamap24") + } + + test("test create datamap with multiple join") { + sql("drop datamap if exists datamap25") + sql("create datamap datamap25 using 'mv' as select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2,fact_table3 t3 where t1.empname = t2.empname and t1.empname=t3.empname") + sql(s"rebuild datamap datamap25") + val frame = sql( + "select t1.empname as c1, t2.designation from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname") + val analyzed = frame.queryExecution.analyzed + assert(!verifyMVDataMap(analyzed, "datamap25")) + checkAnswer(frame, sql("select t1.empname, t2.designation from fact_table4 t1,fact_table5 t2 where t1.empname = t2.empname")) + sql(s"drop datamap datamap25") + } + + ignore("test create datamap with simple join on datamap and multi join on query") { + sql("create datamap datamap26 using 'mv' as select t1.empname, t2.designation, t2.empname from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname") + sql(s"rebuild datamap datamap26") + val frame = sql( + "select t1.empname, t2.designation, t2.empname from fact_table1 t1,fact_table2 t2,fact_table3 " + + "t3 where t1.empname = t2.empname and t1.empname=t3.empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap26")) + checkAnswer(frame, sql("select t1.empname, t2.designation, t2.empname from fact_table4 t1,fact_table5 t2,fact_table6 " + + "t3 where t1.empname = t2.empname and t1.empname=t3.empname")) + sql(s"drop datamap datamap26") + } + + test("test create datamap with join with group by") { + sql("create datamap datamap27 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap27") + val frame = sql( + "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " + + "where t1.empname = t2.empname group by t1.empname, t2.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap27")) + checkAnswer(frame, sql("select t1.empname, t2.designation, sum(t1.utilization) from fact_table4 t1,fact_table5 t2 " + + "where t1.empname = t2.empname group by t1.empname, t2.designation")) + sql(s"drop datamap datamap27") + } + + test("test create datamap with join with group by and sub projection") { + sql("drop datamap if exists datamap28") + sql("create datamap datamap28 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap28") + val frame = sql( + "select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " + + "t1.empname = t2.empname group by t2.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap28")) + checkAnswer(frame, sql("select t2.designation, sum(t1.utilization) from fact_table4 t1,fact_table5 t2 where " + + "t1.empname = t2.empname group by t2.designation")) + sql(s"drop datamap datamap28") + } + + test("test create datamap with join with group by and sub projection with filter") { + sql("drop datamap if exists datamap29") + sql("create datamap datamap29 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap29") + val frame = sql( + "select t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where " + + "t1.empname = t2.empname and t1.empname='shivani' group by t2.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap29")) + checkAnswer(frame, sql("select t2.designation, sum(t1.utilization) from fact_table4 t1,fact_table5 t2 where " + + "t1.empname = t2.empname and t1.empname='shivani' group by t2.designation")) + sql(s"drop datamap datamap29") + } + + test("test create datamap with join with group by with filter") { + sql("drop datamap if exists datamap30") + sql("create datamap datamap30 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 where t1.empname = t2.empname group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap30") + val frame = sql( + "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " + + "where t1.empname = t2.empname and t2.designation='SA' group by t1.empname, t2.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap30")) + checkAnswer(frame, sql("select t1.empname, t2.designation, sum(t1.utilization) from fact_table4 t1,fact_table5 t2 " + + "where t1.empname = t2.empname and t2.designation='SA' group by t1.empname, t2.designation")) + sql(s"drop datamap datamap30") + } + + test("test create datamap with expression on projection") { + sql(s"drop datamap if exists datamap31") + sql("create datamap datamap31 using 'mv' as select empname, designation, utilization, projectcode from fact_table1 ") + sql(s"rebuild datamap datamap31") + val frame = sql( + "select empname, designation, utilization+projectcode from fact_table1") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap31")) + checkAnswer(frame, sql("select empname, designation, utilization+projectcode from fact_table2")) + sql(s"drop datamap datamap31") + } + + test("test create datamap with simple and sub group by query and count agg") { + sql(s"drop datamap if exists datamap32") + sql("create datamap datamap32 using 'mv' as select empname, count(utilization) from fact_table1 group by empname") + sql(s"rebuild datamap datamap32") + val frame = sql("select empname,count(utilization) from fact_table1 where empname='shivani' group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap32")) + checkAnswer(frame, sql("select empname,count(utilization) from fact_table2 where empname='shivani' group by empname")) + sql(s"drop datamap datamap32") + } + + ignore("test create datamap with simple and sub group by query and avg agg") { + sql(s"drop datamap if exists datamap33") + sql("create datamap datamap33 using 'mv' as select empname, avg(utilization) from fact_table1 group by empname") + sql(s"rebuild datamap datamap33") + val frame = sql("select empname,avg(utilization) from fact_table1 where empname='shivani' group by empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap33")) + checkAnswer(frame, sql("select empname,avg(utilization) from fact_table2 where empname='shivani' group by empname")) + sql(s"drop datamap datamap33") + } + + test("test create datamap with left join with group by") { + sql("drop datamap if exists datamap34") + sql("create datamap datamap34 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap34") + val frame = sql( + "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " + + "on t1.empname = t2.empname group by t1.empname, t2.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap34")) + checkAnswer(frame, sql("select t1.empname, t2.designation, sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2 " + + "on t1.empname = t2.empname group by t1.empname, t2.designation")) + sql(s"drop datamap datamap34") + } + + test("test create datamap with simple and group by query with filter on datamap but not on projection") { + sql("create datamap datamap35 using 'mv' as select designation, sum(utilization) from fact_table1 where empname='shivani' group by designation") + sql(s"rebuild datamap datamap35") + val frame = sql( + "select designation, sum(utilization) from fact_table1 where empname='shivani' group by designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap35")) + checkAnswer(frame, sql("select designation, sum(utilization) from fact_table2 where empname='shivani' group by designation")) + sql(s"drop datamap datamap35") + } + + test("test create datamap with simple and sub group by query with filter on datamap but not on projection") { + sql("create datamap datamap36 using 'mv' as select designation, sum(utilization) from fact_table1 where empname='shivani' group by designation") + sql(s"rebuild datamap datamap36") + val frame = sql( + "select sum(utilization) from fact_table1 where empname='shivani' group by designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap36")) + checkAnswer(frame, sql("select sum(utilization) from fact_table2 where empname='shivani' group by designation")) + sql(s"drop datamap datamap36") + } + + test("test create datamap with agg push join with sub group by ") { + sql("drop datamap if exists datamap37") + sql("create datamap datamap37 using 'mv' as select empname, designation, sum(utilization) from fact_table1 group by empname, designation") + sql(s"rebuild datamap datamap37") + val frame = sql( + "select t1.empname, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " + + "where t1.empname = t2.empname group by t1.empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap37")) + checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from fact_table3 t1,fact_table4 t2 " + + "where t1.empname = t2.empname group by t1.empname, t1.designation")) + sql(s"drop datamap datamap37") + } + + test("test create datamap with agg push join with group by ") { + sql("drop datamap if exists datamap38") + sql("create datamap datamap38 using 'mv' as select empname, designation, sum(utilization) from fact_table1 group by empname, designation") + sql(s"rebuild datamap datamap38") + val frame = sql( + "select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " + + "where t1.empname = t2.empname group by t1.empname,t1.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap38")) + checkAnswer(frame, sql("select t1.empname,t1.designation, sum(t1.utilization) from fact_table3 t1,fact_table4 t2 " + + "where t1.empname = t2.empname group by t1.empname, t1.designation")) + sql(s"drop datamap datamap38") + } + + test("test create datamap with agg push join with group by with filter") { + sql("drop datamap if exists datamap39") + sql("create datamap datamap39 using 'mv' as select empname, designation, sum(utilization) from fact_table1 group by empname, designation ") + sql(s"rebuild datamap datamap39") + val frame = sql( + "select t1.empname, t1.designation, sum(t1.utilization) from fact_table1 t1,fact_table2 t2 " + + "where t1.empname = t2.empname and t1.empname='shivani' group by t1.empname,t1.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap39")) + checkAnswer(frame, sql("select t1.empname,t1.designation, sum(t1.utilization) from fact_table3 t1,fact_table4 t2 " + + "where t1.empname = t2.empname and t1.empname='shivani' group by t1.empname, t1.designation")) + sql(s"drop datamap datamap39") + } + + test("test create datamap with more agg push join with group by with filter") { + sql("drop datamap if exists datamap40") + sql("create datamap datamap40 using 'mv' as select empname, designation, sum(utilization), count(utilization) from fact_table1 group by empname, designation ") + sql(s"rebuild datamap datamap40") + val frame = sql( + "select t1.empname, t1.designation, sum(t1.utilization),count(t1.utilization) from fact_table1 t1,fact_table2 t2 " + + "where t1.empname = t2.empname and t1.empname='shivani' group by t1.empname,t1.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap40")) + checkAnswer(frame, sql("select t1.empname, t1.designation, sum(t1.utilization),count(t1.utilization) from fact_table3 t1,fact_table4 t2 " + + "where t1.empname = t2.empname and t1.empname='shivani' group by t1.empname,t1.designation")) + sql(s"drop datamap datamap40") + } + + test("test create datamap with left join with group by with filter") { + sql("drop datamap if exists datamap41") + sql("create datamap datamap41 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap41") + val frame = sql( + "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " + + "on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname, t2.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap41")) + checkAnswer(frame, sql("select t1.empname, t2.designation, sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2 " + + "on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname, t2.designation")) + sql(s"drop datamap datamap41") + } + + test("test create datamap with left join with sub group by") { + sql("drop datamap if exists datamap42") + sql("create datamap datamap42 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap42") + val frame = sql( + "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " + + "on t1.empname = t2.empname group by t1.empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap42")) + checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2 " + + "on t1.empname = t2.empname group by t1.empname")) + sql(s"drop datamap datamap42") + } + + test("test create datamap with left join with sub group by with filter") { + sql("drop datamap if exists datamap43") + sql("create datamap datamap43 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap43") + val frame = sql( + "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " + + "on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap43")) + checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2 " + + "on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname")) + sql(s"drop datamap datamap43") + } + + test("test create datamap with left join with sub group by with filter on mv") { + sql("drop datamap if exists datamap44") + sql("create datamap datamap44 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap44") + val frame = sql( + "select t1.empname, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " + + "on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap44")) + checkAnswer(frame, sql("select t1.empname, sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2 " + + "on t1.empname = t2.empname where t1.empname='shivani' group by t1.empname")) + sql(s"drop datamap datamap44") + } + + test("test create datamap with left join on query and equi join on mv with group by with filter") { + sql("drop datamap if exists datamap45") + + sql("create datamap datamap45 using 'mv' as select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 join fact_table2 t2 on t1.empname = t2.empname group by t1.empname, t2.designation") + sql(s"rebuild datamap datamap45") + // During spark optimizer it converts the left outer join queries with equi join if any filter present on right side table + val frame = sql( + "select t1.empname, t2.designation, sum(t1.utilization) from fact_table1 t1 left join fact_table2 t2 " + + "on t1.empname = t2.empname where t2.designation='SA' group by t1.empname, t2.designation") + val analyzed = frame.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap45")) + checkAnswer(frame, sql("select t1.empname, t2.designation, sum(t1.utilization) from fact_table4 t1 left join fact_table5 t2 " + + "on t1.empname = t2.empname where t2.designation='SA' group by t1.empname, t2.designation")) + sql(s"drop datamap datamap45") + } + + + def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { + val tables = logicalPlan collect { + case l: LogicalRelation => l.catalogTable.get + } + tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table")) + } + + + def drop(): Unit = { + sql("drop table IF EXISTS fact_table1") + sql("drop table IF EXISTS fact_table2") + sql("drop table IF EXISTS fact_table3") + sql("drop table IF EXISTS fact_table4") + sql("drop table IF EXISTS fact_table5") + sql("drop table IF EXISTS fact_table6") + } + + override def afterAll { + drop() + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala new file mode 100644 index 0000000..f8eb11f --- /dev/null +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVSampleTestCase.scala @@ -0,0 +1,167 @@ +package org.apache.carbondata.mv.rewrite + +import java.io.File + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.mv.rewrite.matching.TestSQLBatch._ + +class MVSampleTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + drop() + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + val projectPath = new File(this.getClass.getResource("/").getPath + "../../../../../") + .getCanonicalPath.replaceAll("\\\\", "/") + val integrationPath = s"$projectPath/integration" + val resourcesPath = s"$integrationPath/spark-common-test/src/test/resources" + sql("drop database if exists sample cascade") + sql("create database sample") + sql("use sample") + + createTables.map(sql) + + } + + def createTables: Seq[String] = { + Seq[String]( + s""" + |CREATE TABLE Fact ( + | `tid` int, + | `fpgid` int, + | `flid` int, + | `date` timestamp, + | `faid` int, + | `price` double, + | `qty` int, + | `disc` string + |) + |STORED BY 'org.apache.carbondata.format' + """.stripMargin.trim, + s""" + |CREATE TABLE Dim ( + | `lid` int, + | `city` string, + | `state` string, + | `country` string + |) + |STORED BY 'org.apache.carbondata.format' + """.stripMargin.trim, + s""" + |CREATE TABLE Item ( + | `i_item_id` int, + | `i_item_sk` int + |) + |STORED BY 'org.apache.carbondata.format' + """.stripMargin.trim + ) + } + + + test("test create datamap with sampleTestCases case_1") { + sql(s"drop datamap if exists datamap_sm1") + sql(s"create datamap datamap_sm1 using 'mv' as ${sampleTestCases(0)._2}") + sql(s"rebuild datamap datamap_sm1") + val df = sql(sampleTestCases(0)._3) + val analyzed = df.queryExecution.analyzed + assert(!verifyMVDataMap(analyzed, "datamap_sm1")) + sql(s"drop datamap datamap_sm1") + } + + test("test create datamap with sampleTestCases case_3") { + sql(s"drop datamap if exists datamap_sm2") + sql(s"create datamap datamap_sm2 using 'mv' as ${sampleTestCases(2)._2}") + sql(s"rebuild datamap datamap_sm2") + val df = sql(sampleTestCases(2)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_sm2")) + sql(s"drop datamap datamap_sm2") + } + + test("test create datamap with sampleTestCases case_4") { + sql(s"drop datamap if exists datamap_sm3") + sql(s"create datamap datamap_sm3 using 'mv' as ${sampleTestCases(3)._2}") + sql(s"rebuild datamap datamap_sm3") + val df = sql(sampleTestCases(3)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_sm3")) + sql(s"drop datamap datamap_sm3") + } + + test("test create datamap with sampleTestCases case_5") { + sql(s"drop datamap if exists datamap_sm4") + sql(s"create datamap datamap_sm4 using 'mv' as ${sampleTestCases(4)._2}") + sql(s"rebuild datamap datamap_sm4") + val df = sql(sampleTestCases(4)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_sm4")) + sql(s"drop datamap datamap_sm4") + } + + test("test create datamap with sampleTestCases case_6") { + sql(s"drop datamap if exists datamap_sm5") + sql(s"create datamap datamap_sm5 using 'mv' as ${sampleTestCases(5)._2}") + sql(s"rebuild datamap datamap_sm5") + val df = sql(sampleTestCases(5)._3) + val analyzed = df.queryExecution.analyzed + assert(!verifyMVDataMap(analyzed, "datamap_sm5")) + sql(s"drop datamap datamap_sm5") + } + + test("test create datamap with sampleTestCases case_7") { + sql(s"drop datamap if exists datamap_sm6") + sql(s"create datamap datamap_sm6 using 'mv' as ${sampleTestCases(6)._2}") + sql(s"rebuild datamap datamap_sm6") + val df = sql(sampleTestCases(6)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_sm6")) + sql(s"drop datamap datamap_sm6") + } + + test("test create datamap with sampleTestCases case_8") { + sql(s"drop datamap if exists datamap_sm7") + sql(s"create datamap datamap_sm7 using 'mv' as ${sampleTestCases(7)._2}") + sql(s"rebuild datamap datamap_sm7") + val df = sql(sampleTestCases(7)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_sm7")) + sql(s"drop datamap datamap_sm7") + } + + test("test create datamap with sampleTestCases case_9") { + sql(s"drop datamap if exists datamap_sm8") + sql(s"create datamap datamap_sm8 using 'mv' as ${sampleTestCases(8)._2}") + sql(s"rebuild datamap datamap_sm8") + val df = sql(sampleTestCases(8)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_sm8")) + sql(s"drop datamap datamap_sm8") + } + + + def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { + val tables = logicalPlan collect { + case l: LogicalRelation => l.catalogTable.get + } + tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table")) + } + + + def drop(): Unit = { + sql("use default") + sql("drop database if exists sample cascade") + } + + override def afterAll { + drop() + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala new file mode 100644 index 0000000..473b338 --- /dev/null +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVTPCDSTestCase.scala @@ -0,0 +1,146 @@ +package org.apache.carbondata.mv.rewrite + +import java.io.File + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.mv.rewrite.matching.TestTPCDS_1_4_Batch._ +import org.apache.carbondata.mv.testutil.Tpcds_1_4_Tables.tpcds1_4Tables + +class MVTPCDSTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + drop() + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + val projectPath = new File(this.getClass.getResource("/").getPath + "../../../../../") + .getCanonicalPath.replaceAll("\\\\", "/") + val integrationPath = s"$projectPath/integration" + val resourcesPath = s"$integrationPath/spark-common-test/src/test/resources" + sql("drop database if exists tpcds cascade") + sql("create database tpcds") + sql("use tpcds") + + tpcds1_4Tables.foreach { create_table => + sql(create_table) + } + + } + + ignore("test create datamap with tpcds_1_4_testCases case_1") { + sql(s"drop datamap if exists datamap_tpcds1") + sql(s"create datamap datamap_tpcds1 using 'mv' as ${tpcds_1_4_testCases(0)._2}") + sql(s"rebuild datamap datamap_tpcds1") + val df = sql(tpcds_1_4_testCases(0)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_tpcds1")) + sql(s"drop datamap datamap_tpcds1") + } + + ignore("test create datamap with tpcds_1_4_testCases case_3") { + sql(s"drop datamap if exists datamap_tpcds3") + sql(s"create datamap datamap_tpcds3 using 'mv' as ${tpcds_1_4_testCases(2)._2}") + sql(s"rebuild datamap datamap_tpcds3") + val df = sql(tpcds_1_4_testCases(2)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_tpcds3")) + sql(s"drop datamap datamap_tpcds3") + } + + test("test create datamap with tpcds_1_4_testCases case_4") { + sql(s"drop datamap if exists datamap_tpcds4") + sql(s"create datamap datamap_tpcds4 using 'mv' as ${tpcds_1_4_testCases(3)._2}") + sql(s"rebuild datamap datamap_tpcds4") + val df = sql(tpcds_1_4_testCases(3)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_tpcds4")) + sql(s"drop datamap datamap_tpcds4") + } + + ignore("test create datamap with tpcds_1_4_testCases case_5") { + sql(s"drop datamap if exists datamap_tpcds5") + sql(s"create datamap datamap_tpcds5 using 'mv' as ${tpcds_1_4_testCases(4)._2}") + sql(s"rebuild datamap datamap_tpcds5") + val df = sql(tpcds_1_4_testCases(4)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_tpcds5")) + sql(s"drop datamap datamap_tpcds5") + } + + ignore("test create datamap with tpcds_1_4_testCases case_6") { + sql(s"drop datamap if exists datamap_tpcds6") + sql(s"create datamap datamap_tpcds6 using 'mv' as ${tpcds_1_4_testCases(5)._2}") + sql(s"rebuild datamap datamap_tpcds6") + val df = sql(tpcds_1_4_testCases(5)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_tpcds6")) + sql(s"drop datamap datamap_tpcds6") + } + + ignore("test create datamap with tpcds_1_4_testCases case_8") { + sql(s"drop datamap if exists datamap_tpcds8") + sql(s"create datamap datamap_tpcds8 using 'mv' as ${tpcds_1_4_testCases(7)._2}") + sql(s"rebuild datamap datamap_tpcds8") + val df = sql(tpcds_1_4_testCases(7)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_tpcds8")) + sql(s"drop datamap datamap_tpcds8") + } + + ignore("test create datamap with tpcds_1_4_testCases case_11") { + sql(s"drop datamap if exists datamap_tpcds11") + sql(s"create datamap datamap_tpcds11 using 'mv' as ${tpcds_1_4_testCases(10)._2}") + sql(s"rebuild datamap datamap_tpcds11") + val df = sql(tpcds_1_4_testCases(10)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_tpcds11")) + sql(s"drop datamap datamap_tpcds11") + } + + ignore("test create datamap with tpcds_1_4_testCases case_15") { + sql(s"drop datamap if exists datamap_tpcds15") + sql(s"create datamap datamap_tpcds15 using 'mv' as ${tpcds_1_4_testCases(14)._2}") + sql(s"rebuild datamap datamap_tpcds15") + val df = sql(tpcds_1_4_testCases(14)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_tpcds15")) + sql(s"drop datamap datamap_tpcds15") + } + + ignore("test create datamap with tpcds_1_4_testCases case_16") { + sql(s"drop datamap if exists datamap_tpcds16") + sql(s"create datamap datamap_tpcds16 using 'mv' as ${tpcds_1_4_testCases(15)._2}") + sql(s"rebuild datamap datamap_tpcds16") + val df = sql(tpcds_1_4_testCases(15)._3) + val analyzed = df.queryExecution.analyzed + assert(verifyMVDataMap(analyzed, "datamap_tpcds16")) + sql(s"drop datamap datamap_tpcds16") + } + + + + def verifyMVDataMap(logicalPlan: LogicalPlan, dataMapName: String): Boolean = { + val tables = logicalPlan collect { + case l: LogicalRelation => l.catalogTable.get + } + tables.exists(_.identifier.table.equalsIgnoreCase(dataMapName+"_table")) + } + + + def drop(): Unit = { + sql("use default") + sql("drop database if exists tpcds cascade") + } + + override def afterAll { + drop() + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) + } +}