[CARBONDATA-2475] Support Modular Core for Materialized View DataMap for query matching and rewriting
Support Modular Core for Materialized View DataMap This closes #2302 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bf73e9fe Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bf73e9fe Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bf73e9fe Branch: refs/heads/spark-2.3 Commit: bf73e9fe77523e23be46e7597e2c990e855401e5 Parents: d14c403 Author: ravipesala <ravi.pes...@gmail.com> Authored: Sat May 12 22:49:19 2018 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sun May 13 17:08:19 2018 +0800 ---------------------------------------------------------------------- datamap/mv/core/pom.xml | 169 ++ .../carbondata/mv/datamap/MVAnalyzerRule.scala | 105 + .../mv/datamap/MVDataMapProvider.scala | 125 + .../apache/carbondata/mv/datamap/MVHelper.scala | 377 +++ .../apache/carbondata/mv/datamap/MVState.scala | 55 + .../mv/rewrite/DefaultMatchMaker.scala | 647 +++++ .../carbondata/mv/rewrite/MatchConditions.scala | 28 + .../carbondata/mv/rewrite/MatchMaker.scala | 47 + .../carbondata/mv/rewrite/Navigator.scala | 196 ++ .../carbondata/mv/rewrite/QueryRewrite.scala | 53 + .../mv/rewrite/SummaryDatasetCatalog.scala | 168 ++ .../apache/carbondata/mv/rewrite/Utils.scala | 358 +++ .../mv/rewrite/MVCreateTestCase.scala | 676 +++++ .../mv/rewrite/MVSampleTestCase.scala | 167 ++ .../carbondata/mv/rewrite/MVTPCDSTestCase.scala | 146 + .../carbondata/mv/rewrite/MVTpchTestCase.scala | 247 ++ .../SelectSelectExactChildrenSuite.scala | 76 + .../carbondata/mv/rewrite/Tpcds_1_4_Suite.scala | 80 + .../mv/rewrite/matching/TestSQLBatch.scala | 214 ++ .../rewrite/matching/TestTPCDS_1_4_Batch.scala | 2496 ++++++++++++++++++ 20 files changed, 6430 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/pom.xml ---------------------------------------------------------------------- diff --git a/datamap/mv/core/pom.xml b/datamap/mv/core/pom.xml new file mode 100644 index 0000000..99a8e22 --- /dev/null +++ b/datamap/mv/core/pom.xml @@ -0,0 +1,169 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-parent</artifactId> + <version>1.4.0-SNAPSHOT</version> + <relativePath>../../../pom.xml</relativePath> + </parent> + + <artifactId>carbondata-mv-core</artifactId> + <name>Apache CarbonData :: Materialized View Core</name> + + <properties> + <dev.path>${basedir}/../../../dev</dev.path> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-mv-plan</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-spark2</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_${scala.binary.version}</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <testSourceDirectory>src/test/scala</testSourceDirectory> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.18</version> + <!-- Note config is repeated in scalatest config --> + <configuration> + <skip>false</skip> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + </systemProperties> + <testFailureIgnore>false</testFailureIgnore> + <failIfNoTests>false</failIfNoTests> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <version>2.17</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.scala-tools</groupId> + <artifactId>maven-scala-plugin</artifactId> + <version>2.15.2</version> + <executions> + <execution> + <id>compile</id> + <goals> + <goal>compile</goal> + </goals> + <phase>compile</phase> + </execution> + <execution> + <id>testCompile</id> + <goals> + <goal>testCompile</goal> + </goals> + <phase>test</phase> + </execution> + <execution> + <phase>process-resources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.4.1</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>com.ning.maven.plugins</groupId> + <artifactId>maven-duplicate-finder-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>1.0</version> + <!-- Note config is repeated in surefire config --> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <testFailureIgnore>false</testFailureIgnore> + <filereports>CarbonTestSuite.txt</filereports> + <argLine>-ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m + </argLine> + <stderr /> + <environmentVariables> + </environmentVariables> + <systemProperties> + <java.awt.headless>true</java.awt.headless> + </systemProperties> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <id>sdvtest</id> + <properties> + <maven.test.skip>true</maven.test.skip> + </properties> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala new file mode 100644 index 0000000..4e93f15 --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.mv.datamap + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF} +import org.apache.spark.sql.catalyst.plans.logical.{Command, DeserializeToObject, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.datamap.DataMapManager +import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog} + +/** + * Analyzer rule to rewrite the query for MV datamap + * + * @param sparkSession + */ +class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + // TODO Find way better way to get the provider. + private val dataMapProvider = + DataMapManager.get().getDataMapProvider(null, + new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession) + + private val LOGGER = LogServiceFactory.getLogService(classOf[MVAnalyzerRule].getName) + + override def apply(plan: LogicalPlan): LogicalPlan = { + var needAnalysis = true + plan.transformAllExpressions { + // first check if any preAgg scala function is applied it is present is in plan + // then call is from create preaggregate table class so no need to transform the query plan + // TODO Add different UDF name + case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") => + needAnalysis = false + al + // in case of query if any unresolve alias is present then wait for plan to be resolved + // return the same plan as we can tranform the plan only when everything is resolved + case unresolveAlias@UnresolvedAlias(_, _) => + needAnalysis = false + unresolveAlias + case attr@UnresolvedAttribute(_) => + needAnalysis = false + attr + } + val catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider, + DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog] + if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) { + val modularPlan = catalog.mVState.rewritePlan(plan).withSummaryData + if (modularPlan.find (_.rewritten).isDefined) { + val compactSQL = modularPlan.asCompactSQL + LOGGER.audit(s"\n$compactSQL\n") + val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed + analyzed + } else { + plan + } + } else { + plan + } + } + + def isValidPlan(plan: LogicalPlan, catalog: SummaryDatasetCatalog): Boolean = { + !plan.isInstanceOf[Command] && !isDataMapExists(plan, catalog.listAllSchema()) && + !plan.isInstanceOf[DeserializeToObject] + } + /** + * Check whether datamap table already updated in the query. + * + * @param plan + * @param mvs + * @return + */ + def isDataMapExists(plan: LogicalPlan, mvs: Array[SummaryDataset]): Boolean = { + val catalogs = plan collect { + case l: LogicalRelation => l.catalogTable + } + catalogs.isEmpty || catalogs.exists { c => + mvs.exists { mv => + val identifier = mv.dataMapSchema.getRelationIdentifier + identifier.getTableName.equals(c.get.identifier.table) && + identifier.getDatabaseName.equals(c.get.database) + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala new file mode 100644 index 0000000..2aba23e --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.mv.datamap + +import java.io.IOException + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias +import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand +import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand +import org.apache.spark.sql.execution.datasources.FindDataSourceTable +import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.util.SparkSQLUtil + +import org.apache.carbondata.common.annotations.InterfaceAudience +import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException +import org.apache.carbondata.core.datamap.{DataMapCatalog, DataMapProvider, DataMapStoreManager} +import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory} +import org.apache.carbondata.core.indexstore.Blocklet +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog} + +@InterfaceAudience.Internal +class MVDataMapProvider( + mainTable: CarbonTable, + sparkSession: SparkSession, + dataMapSchema: DataMapSchema) + extends DataMapProvider(mainTable, dataMapSchema) { + protected var dropTableCommand: CarbonDropTableCommand = null + + @throws[MalformedDataMapCommandException] + @throws[IOException] + override def initMeta(ctasSqlStatement: String): Unit = { + if (ctasSqlStatement == null) { + throw new MalformedDataMapCommandException( + "select statement is mandatory") + } + MVHelper.createMVDataMap(sparkSession, dataMapSchema, ctasSqlStatement, true) + DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema) + } + + override def initData(): Unit = { + } + + @throws[IOException] + override def cleanMeta(): Unit = { + dropTableCommand = new CarbonDropTableCommand(true, + new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName), + dataMapSchema.getRelationIdentifier.getTableName, + true) + dropTableCommand.processMetadata(sparkSession) + DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema) + DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName) + } + + override def cleanData(): Unit = { + if (dropTableCommand != null) { + dropTableCommand.processData(sparkSession) + } + } + + @throws[IOException] + override def rebuild(): Unit = { + val ctasQuery = dataMapSchema.getCtasQuery + if (ctasQuery != null) { + val identifier = dataMapSchema.getRelationIdentifier + val logicalPlan = + new FindDataSourceTable(sparkSession).apply( + sparkSession.sessionState.catalog.lookupRelation( + TableIdentifier(identifier.getTableName, + Some(identifier.getDatabaseName)))) match { + case s: SubqueryAlias => s.child + case other => other + } + val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(ctasQuery) + val queryPlan = SparkSQLUtil.execute( + sparkSession.sql(updatedQuery).queryExecution.analyzed, + sparkSession).drop("preAgg") + val header = logicalPlan.output.map(_.name).mkString(",") + val loadCommand = CarbonLoadDataCommand( + databaseNameOp = Some(identifier.getDatabaseName), + tableName = identifier.getTableName, + factPathFromUser = null, + dimFilesPath = Seq(), + options = scala.collection.immutable.Map("fileheader" -> header), + isOverwriteTable = true, + inputSqlString = null, + dataFrame = Some(queryPlan), + updateModel = None, + tableInfoOp = None, + internalOptions = Map.empty, + partition = Map.empty) + + SparkSQLUtil.execute(loadCommand, sparkSession) + } + } + + @throws[IOException] + override def incrementalBuild( + segmentIds: Array[String]): Unit = { + throw new UnsupportedOperationException + } + + override def createDataMapCatalog : DataMapCatalog[SummaryDataset] = + new SummaryDatasetCatalog(sparkSession) + + override def getDataMapFactory: DataMapFactory[_ <: DataMap[_ <: Blocklet]] = { + throw new UnsupportedOperationException + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala new file mode 100644 index 0000000..0f9362f --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.mv.datamap + +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression, ScalaUDF} +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} +import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor} +import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.parser.CarbonSpark2SqlParser + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, DataMapSchemaStorageProvider, RelationIdentifier} +import org.apache.carbondata.mv.plans.modular +import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select} +import org.apache.carbondata.mv.rewrite.{DefaultMatchMaker, QueryRewrite} +import org.apache.carbondata.spark.util.CommonUtil + +/** + * Utility for MV datamap operations. + */ +object MVHelper { + + def createMVDataMap(sparkSession: SparkSession, + dataMapSchema: DataMapSchema, + queryString: String, + ifNotExistsSet: Boolean = false): Unit = { + val dmProperties = dataMapSchema.getProperties.asScala + val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString) + val logicalPlan = sparkSession.sql(updatedQuery).drop("preAgg").queryExecution.analyzed + val fields = logicalPlan.output.map { attr => + val name = updateColumnName(attr) + val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName + if (attr.dataType.typeName.startsWith("decimal")) { + val (precision, scale) = CommonUtil.getScaleAndPrecision(attr.dataType.catalogString) + Field(column = name, + dataType = Some(attr.dataType.typeName), + name = Some(name), + children = None, + precision = precision, + scale = scale, + rawSchema = rawSchema) + } else { + Field(column = name, + dataType = Some(attr.dataType.typeName), + name = Some(name), + children = None, + rawSchema = rawSchema) + } + } + val tableProperties = mutable.Map[String, String]() + dmProperties.foreach(t => tableProperties.put(t._1, t._2)) + + val selectTables = getTables(logicalPlan) + + // TODO inherit the table properties like sort order, sort scope and block size from parent + // tables to mv datamap table + // TODO Use a proper DB + val tableIdentifier = + TableIdentifier(dataMapSchema.getDataMapName + "_table", + selectTables.head.identifier.database) + // prepare table model of the collected tokens + val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel( + ifNotExistPresent = ifNotExistsSet, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database), + tableIdentifier.table.toLowerCase, + fields, + Seq(), + tableProperties, + None, + isAlterFlow = false, + None) + + val tablePath = if (dmProperties.contains("path")) { + dmProperties("path") + } else { + CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession) + } + CarbonCreateTableCommand(TableNewProcessor(tableModel), + tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession) + + dataMapSchema.setCtasQuery(queryString) + dataMapSchema + .setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get, + tableIdentifier.table, + "")) + + val parentIdents = selectTables.map { table => + new RelationIdentifier(table.database, table.identifier.table, "") + } + dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava)) + DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema) + } + + def updateColumnName(attr: Attribute): String = { + val name = attr.name.replace("(", "_").replace(")", "").replace(" ", "_").replace("=", "") + attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name) + } + + def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = { + logicalPlan.collect { + case l: LogicalRelation => l.catalogTable.get + } + } + + def dropDummFuc(plan: LogicalPlan): LogicalPlan = { + plan transform { + case p@Project(exps, child) => + Project(dropDummyExp(exps), child) + case Aggregate(grp, aggExp, child) => + Aggregate( + grp, + dropDummyExp(aggExp), + child) + } + } + + private def dropDummyExp(exps: Seq[NamedExpression]) = { + exps.map { + case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") => None + case attr: AttributeReference if attr.name.equalsIgnoreCase("preAgg") => None + case other => Some(other) + }.filter(_.isDefined).map(_.get) + } + + def getAttributeMap(subsumer: Seq[NamedExpression], + subsume: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = { + if (subsumer.length == subsume.length) { + subsume.zip(subsumer).flatMap { case (left, right) => + var tuples = left collect { + case attr: AttributeReference => + (AttributeKey(attr), createAttrReference(right, attr.name)) + } + left match { + case a: Alias => + tuples = Seq((AttributeKey(a.child), createAttrReference(right, a.name))) ++ tuples + case _ => + } + Seq((AttributeKey(left), createAttrReference(right, left.name))) ++ tuples + }.toMap + } else { + throw new UnsupportedOperationException("Cannot create mapping with unequal sizes") + } + } + + def createAttrReference(ref: NamedExpression, name: String): Alias = { + Alias(ref, name)(exprId = ref.exprId, qualifier = None) + } + + case class AttributeKey(exp: Expression) { + + override def equals(other: Any): Boolean = other match { + case attrKey: AttributeKey => + exp.semanticEquals(attrKey.exp) + case _ => false + } + + override def hashCode: Int = exp.hashCode + + } + + /** + * Updates the expressions as per the subsumer output expressions. It is needed to update the + * expressions as per the datamap table relation + * + * @param expressions expressions which are needed to update + * @param aliasName table alias name + * @return Updated expressions + */ + def updateSubsumeAttrs( + expressions: Seq[Expression], + attrMap: Map[AttributeKey, NamedExpression], + aliasName: Option[String], + keepAlias: Boolean = false): Seq[Expression] = { + + def getAttribute(exp: Expression) = { + exp match { + case Alias(agg: AggregateExpression, name) => + agg.aggregateFunction.collect { + case attr: AttributeReference => + AttributeReference(attr.name, attr.dataType, attr.nullable, attr + .metadata)(attr.exprId, + aliasName, + attr.isGenerated) + }.head + case Alias(child, name) => + child + case other => other + } + } + + expressions.map { + case alias@Alias(agg: AggregateExpression, name) => + attrMap.get(AttributeKey(alias)).map { exp => + Alias(getAttribute(exp), name)(alias.exprId, + alias.qualifier, + alias.explicitMetadata, + alias.isGenerated) + }.getOrElse(alias) + + case attr: AttributeReference => + val uattr = attrMap.get(AttributeKey(attr)).map{a => + if (keepAlias) { + AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId, + attr.qualifier, + a.isGenerated) + } else { + a + } + }.getOrElse(attr) + uattr + case expression: Expression => + val uattr = attrMap.getOrElse(AttributeKey(expression), expression) + uattr + } + } + + def updateOutPutList( + subsumerOutputList: Seq[NamedExpression], + dataMapRltn: Select, + aliasMap: Map[AttributeKey, NamedExpression], + keepAlias: Boolean): Seq[NamedExpression] = { + var outputSel = + updateSubsumeAttrs( + subsumerOutputList, + aliasMap, + Some(dataMapRltn.aliasMap.values.head), + keepAlias).asInstanceOf[Seq[NamedExpression]] + outputSel.zip(subsumerOutputList).map{ case (l, r) => + l match { + case attr: AttributeReference => + Alias(attr, r.name)(r.exprId, None) + case a@Alias(attr: AttributeReference, name) => + Alias(attr, r.name)(r.exprId, None) + case other => other + } + } + + } + + def updateSelectPredicates( + predicates: Seq[Expression], + attrMap: Map[AttributeKey, NamedExpression], + keepAlias: Boolean): Seq[Expression] = { + predicates.map { exp => + exp transform { + case attr: AttributeReference => + val uattr = attrMap.get(AttributeKey(attr)).map{a => + if (keepAlias) { + AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId, + attr.qualifier, + a.isGenerated) + } else { + a + } + }.getOrElse(attr) + uattr + } + } + } + + /** + * Update the modular plan as per the datamap table relation inside it. + * + * @param subsumer plan to be updated + * @return Updated modular plan. + */ + def updateDataMap(subsumer: ModularPlan, rewrite: QueryRewrite): ModularPlan = { + subsumer match { + case s: Select if s.dataMapTableRelation.isDefined => + val relation = s.dataMapTableRelation.get.asInstanceOf[Select] + val mappings = s.outputList zip relation.outputList + val oList = for ((o1, o2) <- mappings) yield { + if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2 + } + relation.copy(outputList = oList).setRewritten() + case g: GroupBy if g.dataMapTableRelation.isDefined => + val relation = g.dataMapTableRelation.get.asInstanceOf[Select] + val in = relation.asInstanceOf[Select].outputList + val mappings = g.outputList zip relation.outputList + val oList = for ((left, right) <- mappings) yield { + left match { + case Alias(agg@AggregateExpression(fun@Sum(child), _, _, _), name) => + val uFun = fun.copy(child = right) + Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId) + case Alias(agg@AggregateExpression(fun@Max(child), _, _, _), name) => + val uFun = fun.copy(child = right) + Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId) + case Alias(agg@AggregateExpression(fun@Min(child), _, _, _), name) => + val uFun = fun.copy(child = right) + Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId) + case Alias(agg@AggregateExpression(fun@Count(Seq(child)), _, _, _), name) => + val uFun = Sum(right) + Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId) + case _ => + if (left.name != right.name) Alias(right, left.name)(exprId = left.exprId) else right + } + } + val updatedPredicates = g.predicateList.map { f => + mappings.find{ case (k, y) => + k match { + case a: Alias if f.isInstanceOf[Alias] => + a.child.semanticEquals(f.children.head) + case a: Alias => a.child.semanticEquals(f) + case other => other.semanticEquals(f) + } + } match { + case Some(r) => r._2 + case _ => f + } + } + g.copy(outputList = oList, + inputList = in, + predicateList = updatedPredicates, + child = relation, + dataMapTableRelation = None).setRewritten() + + case select: Select => + select.children match { + case Seq(s: Select) if s.dataMapTableRelation.isDefined => + val relation = s.dataMapTableRelation.get.asInstanceOf[Select] + val child = updateDataMap(s, rewrite).asInstanceOf[Select] + val aliasMap = getAttributeMap(relation.outputList, s.outputList) + var outputSel = + updateOutPutList(select.outputList, relation, aliasMap, keepAlias = true) + val pred = updateSelectPredicates(select.predicateList, aliasMap, true) + select.copy(outputList = outputSel, + inputList = child.outputList, + predicateList = pred, + children = Seq(child)).setRewritten() + + case Seq(g: GroupBy) if g.dataMapTableRelation.isDefined => + val relation = g.dataMapTableRelation.get.asInstanceOf[Select] + val aliasMap = getAttributeMap(relation.outputList, g.outputList) + + val outputSel = + updateOutPutList(select.outputList, relation, aliasMap, keepAlias = false) + val child = updateDataMap(g, rewrite).asInstanceOf[Matchable] + // TODO Remove the unnecessary columns from selection. + // Only keep columns which are required by parent. + val inputSel = child.outputList + select.copy( + outputList = outputSel, + inputList = inputSel, + children = Seq(child)).setRewritten() + + case _ => select + } + + case other => other + } + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala new file mode 100644 index 0000000..412d547 --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.datamap + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.carbondata.mv.plans.modular.SimpleModularizer +import org.apache.carbondata.mv.plans.util.BirdcageOptimizer +import org.apache.carbondata.mv.rewrite.{DefaultMatchMaker, Navigator, QueryRewrite, SummaryDatasetCatalog} + +/** + * A class that holds all session-specific state. + */ +private[mv] class MVState(summaryDatasetCatalog: SummaryDatasetCatalog) { + + // Note: These are all lazy vals because they depend on each other (e.g. conf) and we + // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs. + + /** + * Modular query plan modularizer + */ + lazy val modularizer = SimpleModularizer + + /** + * Logical query plan optimizer. + */ + lazy val optimizer = BirdcageOptimizer + + lazy val matcher = DefaultMatchMaker + + lazy val navigator: Navigator = new Navigator(summaryDatasetCatalog, this) + + /** + * Rewrite the logical query plan to MV plan if applicable. + * @param plan + * @return + */ + def rewritePlan(plan: LogicalPlan): QueryRewrite = new QueryRewrite(this, plan) + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala new file mode 100644 index 0000000..899c36c --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala @@ -0,0 +1,647 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.rewrite + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _} +import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner} + +import org.apache.carbondata.mv.datamap.MVHelper +import org.apache.carbondata.mv.plans.modular +import org.apache.carbondata.mv.plans.modular._ +import org.apache.carbondata.mv.plans.modular.Flags._ +import org.apache.carbondata.mv.plans.util.SQLBuilder + +abstract class DefaultMatchMaker extends MatchMaker[ModularPlan] + +abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] { + + /** Name for this pattern, automatically inferred based on class name. */ + val patternName: String = { + val className = getClass.getName + if (className endsWith "$") className.dropRight(1) else className + } + + def factorOutSubsumer( + compensation: ModularPlan, + subsumer: Matchable, + aliasMapMain: Map[Int, String]): ModularPlan = { + + // Create aliasMap with attribute to alias reference attribute + val aliasMap = AttributeMap( + subsumer.outputList.collect { + case a: Alias if a.child.isInstanceOf[Attribute] => + (a.child.asInstanceOf[Attribute], a.toAttribute) + }) + + // Check and replace all alias references with subsumer alias map references. + val compensation1 = compensation.transform { + case plan if !plan.skip && plan != subsumer => + plan.transformExpressions { + case a: AttributeReference => + aliasMap + .get(a) + .map { ref => + AttributeReference( + ref.name, ref.dataType)( + exprId = ref.exprId, + qualifier = a.qualifier) + }.getOrElse(a) + } + } + + val subqueryAttributeSet = SQLBuilder.collectAttributeSet(subsumer.outputList) + if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) { + new UnsupportedOperationException( + s"duplicate name(s): ${ subsumer.output.map(_.toString + ", ") }") + } + if (aliasMapMain.size == 1) { + val subsumerName: Option[String] = aliasMapMain.get(0) + // Replace all compensation1 attributes with refrences of subsumer attributeset + val compensationFinal = compensation1.transformExpressions { + case ref: Attribute if subqueryAttributeSet.contains(ref) => + AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName) + case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) => + Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName) + } + compensationFinal + } else { + compensation1 + } + } +} + +object DefaultMatchMaker extends DefaultMatchMaker { + lazy val patterns = + SelectSelectNoChildDelta :: + GroupbyGroupbyNoChildDelta :: + GroupbyGroupbySelectOnlyChildDelta :: + GroupbyGroupbyGroupbyChildDelta :: + SelectSelectSelectChildDelta :: + SelectSelectGroupbyChildDelta :: Nil +} + +/** + * Convention: + * EmR: each subsumee's expression match some of subsumer's expression + * EdR: each subsumee's expression derive from some of subsumer's expression + * RmE: each subsumer's expression match some of subsumee's expression + * RdE: each subsumer's expression derive from some of subsumee's expression + */ + +object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper { + private def isDerivable( + exprE: Expression, + exprListR: Seq[Expression], + subsumee: ModularPlan, + subsumer: ModularPlan, + compensation: Option[ModularPlan]): Boolean = { + if (subsumee.asInstanceOf[Select].predicateList.contains(exprE)) { + subsumer.asInstanceOf[Select].predicateList.exists(_.semanticEquals(exprE)) || + canEvaluate(exprE, subsumer) + } else if (subsumee.asInstanceOf[Select].outputList.contains(exprE)) { + exprE match { + case a@Alias(_, _) => + exprListR.exists(a1 => a1.isInstanceOf[Alias] && + a1.asInstanceOf[Alias].child.semanticEquals(a.child)) || + exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer)) + case exp => exprListR.exists(_.semanticEquals(exp) || canEvaluate(exp, subsumer)) + } + } else { + false + } + } + + def apply( + subsumer: ModularPlan, + subsumee: ModularPlan, + compensation: Option[ModularPlan], + rewrite: QueryRewrite): Seq[ModularPlan] = { + + (subsumer, subsumee, compensation) match { + case ( + sel_1a @ modular.Select(_, _, _, _, _, _, _, _, _, _), + sel_1q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None + ) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } && + sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } => + + // assume children (including harmonized relation) of subsumer and subsumee + // are 1-1 correspondence. + // Change the following two conditions to more complicated ones if we want to + // consider things that combine extrajoin, rejoin, and harmonized relations + val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count(_ == x) != 1 } + val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count(_ == x) != 1 } + + val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) } + val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) } + val rejoinOutputList = rejoin.flatMap(_.output) + + val isPredicateRmE = sel_1a.predicateList.forall(expr => + sel_1q.predicateList.exists(_.semanticEquals(expr))) + val isPredicateEmdR = sel_1q.predicateList.forall(expr => + isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None)) + val isOutputEdR = sel_1q.outputList.forall(expr => + isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None)) + + if (isUniqueRmE.isEmpty && isUniqueEmR.isEmpty && extrajoin.isEmpty && isPredicateRmE && + isPredicateEmdR && isOutputEdR) { + val mappings = sel_1a.children.zipWithIndex.map { + case (childr, fromIdx) if sel_1q.children.contains(childr) => + val toIndx = sel_1q.children.indexWhere(_ == childr) + (toIndx -> fromIdx) + + } + val e2r = mappings.toMap + val r2e = e2r.map(_.swap) + val r2eJoinsMatch = sel_1a.joinEdges.forall { x => + (r2e.get(x.left), r2e.get(x.right)) match { + case (Some(l), Some(r)) => + val mappedEdge = JoinEdge(l, r, x.joinType) + val joinTypeEquivalent = + if (sel_1q.joinEdges.contains(mappedEdge)) true + else { + x.joinType match { + case Inner | FullOuter => + sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType)) + case _ => false + } + } + if (joinTypeEquivalent) { + val sel_1a_join = sel_1a.extractJoinConditions( + sel_1a.children(x.left), + sel_1a.children(x.right)) + val sel_1q_join = sel_1q.extractJoinConditions( + sel_1q.children(mappedEdge.left), + sel_1q.children(mappedEdge.right)) + sel_1a_join.forall(e => sel_1q_join.exists(e.semanticEquals(_))) && + sel_1q_join.forall(e => sel_1a_join.exists(e.semanticEquals(_))) + } else false + case _ => false + } + } + + val isPredicateEmR = sel_1q.predicateList.forall(expr => + sel_1a.predicateList.exists(_.semanticEquals(expr))) + val isOutputEmR = sel_1q.outputList.forall(expr => + sel_1a.outputList.exists(_.semanticEquals(expr))) + val isOutputRmE = sel_1a.outputList.forall(expr => + sel_1q.outputList.exists(_.semanticEquals(expr))) + + if (r2eJoinsMatch) { + if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty) { + Seq(sel_1a) // no compensation needed + } else { + val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]() + val tAliasMap = new collection.mutable.HashMap[Int, String]() + + val updatedOutList: Seq[NamedExpression] = updateDuplicateColumns(sel_1a) + val usel_1a = sel_1a.copy(outputList = updatedOutList) + tChildren += usel_1a + tAliasMap += (tChildren.indexOf(usel_1a) -> rewrite.newSubsumerName()) + + sel_1q.children.zipWithIndex.foreach { + case (childe, idx) => + if (e2r.get(idx).isEmpty) { + tChildren += childe + sel_1q.aliasMap.get(idx).map(x => tAliasMap += (tChildren.indexOf(childe) -> x)) + } + } + + val tJoinEdges = sel_1q.joinEdges.collect { + case JoinEdge(le, re, joinType) => + (e2r.get(le), e2r.get(re)) match { + case (Some(lr), None) => + JoinEdge( + 0, + tChildren.indexOf(sel_1q.children(re)), + joinType) + case (None, None) => + JoinEdge( + tChildren.indexOf(sel_1q.children(le)), + tChildren.indexOf(sel_1q.children(re)), + joinType) + case (None, Some(rr)) => + JoinEdge( + tChildren.indexOf(sel_1q.children(le)), + 0, + joinType) + case _ => + null.asInstanceOf[JoinEdge] + } + } + val tPredicateList = sel_1q.predicateList.filter { p => + !sel_1a.predicateList.exists(_.semanticEquals(p)) } + val wip = sel_1q.copy( + predicateList = tPredicateList, + children = tChildren, + joinEdges = tJoinEdges.filter(_ != null), + aliasMap = tAliasMap.toMap) + + val done = factorOutSubsumer(wip, usel_1a, wip.aliasMap) + Seq(done) + } + } else Nil + } else Nil + + case ( + sel_3a @ modular.Select(_, _, _, _, _, _, _, _, _, _), + sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None) + if sel_3a.children.forall(_.isInstanceOf[GroupBy]) && + sel_3q.children.forall(_.isInstanceOf[GroupBy]) => + val isPredicateRmE = sel_3a.predicateList.isEmpty || + sel_3a.predicateList.forall(expr => + sel_3q.predicateList.exists(_.semanticEquals(expr))) + val isPredicateEmdR = sel_3q.predicateList.isEmpty || + sel_3q.predicateList.forall(expr => + sel_3a.predicateList.exists(_.semanticEquals(expr)) || + isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None)) + val isOutputEdR = sel_3q.outputList.forall(expr => + isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None)) + val isSingleChild = sel_3a.children.length == 1 && sel_3q.children.length == 1 + + if (isPredicateRmE && isPredicateEmdR && isOutputEdR && isSingleChild) { + val isPredicateEmR = sel_3q.predicateList.isEmpty || + sel_3q.predicateList.forall(expr => + sel_3a.predicateList.exists(_.semanticEquals(expr))) + val isOutputRmE = sel_3a.outputList.forall(expr => + isDerivable(expr, sel_3q.outputList, sel_3a, sel_3q, None)) + val isOutputEmR = sel_3q.outputList.forall(expr => + isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None)) + + if (isPredicateEmR && isOutputEmR && isOutputRmE) { + Seq(sel_3a) + } else if (isPredicateEmR && isOutputEmR) { + // no compensation needed + val sel_3q_exp = sel_3q.transformExpressions({ + case a: Alias => sel_3a.outputList + .find { a1 => + a1.isInstanceOf[Alias] && + a1.asInstanceOf[Alias].child.semanticEquals(a.child) + }.map(_.toAttribute).get + }) + val wip = sel_3q_exp.copy( + children = Seq(sel_3a), + aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap) + val done = factorOutSubsumer(wip, sel_3a, wip.aliasMap) + Seq(done) + } else { + Nil + } + } else Nil + + case _ => Nil + } + } + + private def updateDuplicateColumns(sel_1a: Select) = { + val duplicateNameCols = sel_1a.outputList.groupBy(_.name).filter(_._2.length > 1).flatMap(_._2) + .toList + val updatedOutList = sel_1a.outputList.map { col => + if (duplicateNameCols.contains(col)) { + Alias(col, col.qualifiedName)(exprId = col.exprId) + } else { + col + } + } + updatedOutList + } +} + +object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern { + def apply( + subsumer: ModularPlan, + subsumee: ModularPlan, + compensation: Option[ModularPlan], + rewrite: QueryRewrite): Seq[ModularPlan] = { + (subsumer, subsumee, compensation) match { + case ( + gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _), + gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _), + None) => + val isGroupingEmR = gb_2q.predicateList.forall(expr => + gb_2a.predicateList.exists(_.semanticEquals(expr))) + val isGroupingRmE = gb_2a.predicateList.forall(expr => + gb_2q.predicateList.exists(_.semanticEquals(expr))) + if (isGroupingEmR && isGroupingRmE) { + val isOutputEmR = gb_2q.outputList.forall { + case a @ Alias(_, _) => + gb_2a.outputList.exists{a1 => + a1.isInstanceOf[Alias] && a1.asInstanceOf[Alias].child.semanticEquals(a.child) + } + case exp => gb_2a.outputList.exists(_.semanticEquals(exp)) + } + + if (isOutputEmR) { + // Mappings of output of two plans by checking semantic equals. + val mappings = gb_2a.outputList.zipWithIndex.map { case(exp, index) => + (exp, gb_2q.outputList.find { + case a: Alias if exp.isInstanceOf[Alias] => + a.child.semanticEquals(exp.children.head) + case a: Alias => a.child.semanticEquals(exp) + case other => other.semanticEquals(exp) + }.getOrElse(gb_2a.outputList(index))) + } + + val oList = mappings.map{case (out1, out2) => + if (out1.name != out2.name) out1 match { + case alias: Alias => Alias(alias.child, out2.name)(exprId = alias.exprId) + case _ => Alias(out1, out2.name)(exprId = out2.exprId) + } else out1 + } + + Seq(gb_2a.copy(outputList = oList)) + } else { + Nil + } + } else { + val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias => + (a.toAttribute, a)}) + if (isGroupingEmR) { + Utils.tryMatch( + gb_2a, gb_2q, aliasMap).flatMap { + case g: GroupBy => + Some(g.copy(child = g.child.withNewChildren( + g.child.children.map { + case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a; + case other => other + }))); + case _ => None}.map(Seq(_)).getOrElse(Nil) + } else { + Nil + } + } + + case _ => Nil + } + } +} + +object GroupbyGroupbySelectOnlyChildDelta extends DefaultMatchPattern with PredicateHelper { + private def isDerivable( + exprE: Expression, + exprListR: Seq[Expression], + subsumee: ModularPlan, + subsumer: ModularPlan, + compensation: Option[ModularPlan]) = { + if (subsumee.asInstanceOf[GroupBy].predicateList.contains(exprE)) { + if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE, exprListR)) true + else false + } else if (compensation.getOrElse(throw new RuntimeException("compensation cannot be None")) + .asInstanceOf[Select].predicateList.contains(exprE)) { + if (canEvaluate(exprE, exprListR) || exprListR.exists(_.semanticEquals(exprE))) true + else false + } else { + false + } + } + + def apply( + subsumer: ModularPlan, + subsumee: ModularPlan, + compensation: Option[ModularPlan], + rewrite: QueryRewrite): Seq[ModularPlan] = { + val aggInputEinR = subsumee.expressions + .collect { case agg: aggregate.AggregateExpression => AttributeSet(Seq(agg)) + .subsetOf(subsumer.outputSet) + }.forall(identity) + val compensationSelectOnly = !compensation.map { _.collect { case n => n.getClass } } + .exists(_.contains(modular.GroupBy)) + + (subsumer, subsumee, compensation, aggInputEinR, compensationSelectOnly) match { + case ( + gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _), + gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _), + Some(sel_1c1 @ modular.Select(_, _, _, _, _, _, _, _, _, _)), + true, + true) + if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) => + + val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output) + val isGroupingEdR = gb_2q.predicateList.forall(expr => + isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation)) + val needRegrouping = !gb_2a.predicateList.forall(gb_2q.predicateList.contains) + val canPullup = sel_1c1.predicateList.forall(expr => + isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation)) + val isAggEmR = gb_2q.outputList.collect { + case agg: aggregate.AggregateExpression => + gb_2a.outputList.exists(_.semanticEquals(agg)) + }.forall(identity) + + if (isGroupingEdR && ((!needRegrouping && isAggEmR) || needRegrouping) && canPullup) { + // pull up + val pullupOutputList = gb_2a.outputList.map(_.toAttribute) ++ rejoinOutputList + val sel_2c1 = sel_1c1.copy( + outputList = pullupOutputList, + inputList = pullupOutputList, + children = sel_1c1.children.map { + case s: Select => gb_2a + case other => other }) + + if (rejoinOutputList.isEmpty) { + val aliasMap = AttributeMap(gb_2a.outputList.collect { + case a: Alias => (a.toAttribute, a) }) + Utils.tryMatch(gb_2a, gb_2q, aliasMap).flatMap { + case g: GroupBy => Some(g.copy(child = sel_2c1)); + case _ => None + }.map { wip => + factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap) + }.map(Seq(_)) + .getOrElse(Nil) + } + // TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side) + // via catalog service + else if (!needRegrouping && isAggEmR) { + Seq(sel_2c1).map(wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap)) + } else Nil + } else Nil + + case _ => Nil + } + } +} + +object GroupbyGroupbyGroupbyChildDelta extends DefaultMatchPattern { + def apply( + subsumer: ModularPlan, + subsumee: ModularPlan, + compensation: Option[ModularPlan], + rewrite: QueryRewrite): Seq[ModularPlan] = { + val groupbys = compensation.map { _.collect { case g: GroupBy => g } }.getOrElse(Nil).toSet + + (subsumer, subsumee, groupbys.nonEmpty) match { + case ( + modular.Select(_, _, _, _, _, _, _, _, _, _), + modular.Select(_, _, _, _, _, _, _, _, _, _), + true) => + // TODO: implement me + Nil + + case _ => Nil + } + } +} + + +object SelectSelectSelectChildDelta extends DefaultMatchPattern { + def apply( + subsumer: ModularPlan, + subsumee: ModularPlan, + compensation: Option[ModularPlan], + rewrite: QueryRewrite): Seq[ModularPlan] = { + val compensationSelectOnly = + !compensation + .map { _.collect { case n => n.getClass } } + .exists(_.contains(modular.GroupBy)) + + (subsumer, subsumee, compensationSelectOnly) match { + case ( + modular.Select(_, _, _, _, _, _, _, _, _, _), + modular.Select(_, _, _, _, _, _, _, _, _, _), + true) => + // TODO: implement me + Nil + case _ => Nil + } + } +} + +object SelectSelectGroupbyChildDelta extends DefaultMatchPattern with PredicateHelper { + private def isDerivable( + exprE: Expression, + exprListR: Seq[Expression], + subsumee: ModularPlan, + subsumer: ModularPlan, + compensation: Option[ModularPlan]) = { + Utils.isDerivable( + exprE: Expression, + exprListR: Seq[Expression], + subsumee: ModularPlan, + subsumer: ModularPlan, + compensation: Option[ModularPlan]) + } + + def apply( + subsumer: ModularPlan, + subsumee: ModularPlan, + compensation: Option[ModularPlan], + rewrite: QueryRewrite): Seq[ModularPlan] = { + (subsumer, subsumee, compensation, subsumer.children, subsumee.children) match { + case ( + sel_3a@modular.Select( + _, _, Nil, _, _, + Seq(gb_2a@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _), + sel_3q@modular.Select( + _, _, _, _, _, + Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _), + Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)), + rchild :: Nil, + echild :: Nil) => + val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl } + val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl } + + val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains) + val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains) + val rejoinOutputList = rejoin.flatMap(_.output) + + val isPredicateRmE = sel_3a.predicateList.forall(expr => + sel_3q.predicateList.exists(_.semanticEquals(expr)) || + gb_2c.predicateList.exists(_.semanticEquals(expr))) + val isPredicateEmdR = sel_3q.predicateList + .forall(expr => + sel_3a.predicateList.exists(_.semanticEquals(expr)) || + isDerivable( + expr, + sel_3a.outputList ++ rejoinOutputList, + sel_3q, + sel_3a, + compensation)) + val isOutputEdR = sel_3q.outputList + .forall(expr => + isDerivable( + expr, + sel_3a.outputList ++ rejoinOutputList, + sel_3q, + sel_3a, + compensation)) + + val canSELPullup = gb_2c.child.isInstanceOf[Select] && + gb_2c.child.asInstanceOf[Select].predicateList + .forall(expr => + isDerivable( + expr, + sel_3a.outputList ++ rejoinOutputList, + sel_3q, + sel_3a, + compensation)) + val canGBPullup = gb_2c.predicateList + .forall(expr => + isDerivable( + expr, + sel_3a.outputList ++ rejoinOutputList, + sel_3q, + sel_3a, + compensation)) + + if (extrajoin.isEmpty && isPredicateRmE && + isPredicateEmdR && + isOutputEdR && + canSELPullup && + canGBPullup) { + gb_2c.child match { + case s: Select => + val sel_3c1 = s.withNewChildren( + s.children.map { + case gb: GroupBy => sel_3a.setSkip() + case other => other }) + val gb_3c2 = gb_2c.copy(child = sel_3c1) + + val aliasMap_exp = AttributeMap( + gb_2c.outputList.collect { + case a: Alias => (a.toAttribute, a) }) + val sel_3q_exp = sel_3q.transformExpressions({ + case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr) + }) + // Mappings of output of two plans by checking semantic equals. + val mappings = sel_3q_exp.outputList.zipWithIndex.map { case(exp, index) => + (exp, gb_2c.outputList.find { + case a: Alias if exp.isInstanceOf[Alias] => + a.child.semanticEquals(exp.children.head) + case a: Alias => a.child.semanticEquals(exp) + case other => other.semanticEquals(exp) + }.getOrElse(gb_2c.outputList(index))) + } + + val oList = for ((o1, o2) <- mappings) yield { + if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2 + } + + val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2)) + val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap)) + sel_3c3.map(Seq(_)).getOrElse(Nil) + + case _ => Nil + } + } else { + Nil + } + + case _ => Nil + } + } +} + + http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala new file mode 100644 index 0000000..2a4da27 --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.rewrite + +// TODO: implement this to modularize DefaultMatchingFunctions +object MatchConditions { +} + +class MatchConditions(flags: Long) { + def hasFlag(flag: Long): Boolean = { + throw new UnsupportedOperationException + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala new file mode 100644 index 0000000..2c5d8f4 --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.rewrite + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.trees.TreeNode + +abstract class MatchPattern[MatchingPlan <: TreeNode[MatchingPlan]] extends Logging { + + def apply( + subsumer: MatchingPlan, + subsumee: MatchingPlan, + compensation: Option[MatchingPlan], + rewrite: QueryRewrite): Seq[MatchingPlan] + +} + +abstract class MatchMaker[MatchingPlan <: TreeNode[MatchingPlan]] { + + /** Define a sequence of rules, to be overridden by the implementation. */ + protected val patterns: Seq[MatchPattern[MatchingPlan]] + + def execute( + subsumer: MatchingPlan, + subsumee: MatchingPlan, + compensation: Option[MatchingPlan], + rewrite: QueryRewrite): Iterator[MatchingPlan] = { + val iter = patterns.view.flatMap(_ (subsumer, subsumee, compensation, rewrite)).toIterator + iter + + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala new file mode 100644 index 0000000..545920e --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.rewrite + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSet} + +import org.apache.carbondata.mv.datamap.{MVHelper, MVState} +import org.apache.carbondata.mv.expressions.modular._ +import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select} +import org.apache.carbondata.mv.plans.modular + +private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVState) { + + def rewriteWithSummaryDatasets(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = { + val replaced = plan.transformAllExpressions { + case s: ModularSubquery => + if (s.children.isEmpty) { + ScalarModularSubquery( + rewriteWithSummaryDatasetsCore(s.plan, rewrite), s.children, s.exprId) + } + else throw new UnsupportedOperationException(s"Rewrite expression $s isn't supported") + case o => o + } + rewriteWithSummaryDatasetsCore(replaced, rewrite) + } + + def rewriteWithSummaryDatasetsCore(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = { + val rewrittenPlan = plan transformDown { + case currentFragment => + if (currentFragment.rewritten || !currentFragment.isSPJGH) currentFragment + else { + val compensation = + (for { dataset <- catalog.lookupFeasibleSummaryDatasets(currentFragment).toStream + subsumer <- session.modularizer.modularize( + session.optimizer.execute(dataset.plan)).map(_.harmonized) + subsumee <- unifySubsumee(currentFragment) + comp <- subsume( + unifySubsumer2( + unifySubsumer1( + subsumer, + subsumee, + dataset.relation), + subsumee), + subsumee, rewrite) + } yield comp).headOption + compensation.map(_.setRewritten).getOrElse(currentFragment) + } + } + // In case it is rewritten plan and the datamap table is not updated then update the datamap + // table in plan. + if (rewrittenPlan.find(_.rewritten).isDefined) { + val updatedDataMapTablePlan = rewrittenPlan transform { + case s: Select => + MVHelper.updateDataMap(s, rewrite) + case g: GroupBy => + MVHelper.updateDataMap(g, rewrite) + } + // TODO Find a better way to set the rewritten flag, it may fail in some conditions. + val mapping = + rewrittenPlan.collect {case m: ModularPlan => m } zip + updatedDataMapTablePlan.collect {case m: ModularPlan => m} + mapping.foreach(f => if (f._1.rewritten) f._2.setRewritten()) + + updatedDataMapTablePlan + + } else { + rewrittenPlan + } + } + + def subsume( + subsumer: ModularPlan, + subsumee: ModularPlan, + rewrite: QueryRewrite): Option[ModularPlan] = { + if (subsumer.getClass == subsumee.getClass) { + (subsumer.children, subsumee.children) match { + case (Nil, Nil) => None + case (r, e) if r.forall(_.isInstanceOf[modular.LeafNode]) && + e.forall(_.isInstanceOf[modular.LeafNode]) => + val iter = session.matcher.execute(subsumer, subsumee, None, rewrite) + if (iter.hasNext) Some(iter.next) + else None + + case (rchild :: Nil, echild :: Nil) => + val compensation = subsume(rchild, echild, rewrite) + val oiter = compensation.map { + case comp if comp.eq(rchild) => + session.matcher.execute(subsumer, subsumee, None, rewrite) + case _ => + session.matcher.execute(subsumer, subsumee, compensation, rewrite) + } + oiter.flatMap { case iter if iter.hasNext => Some(iter.next) + case _ => None } + + case _ => None + } + } else None + } + + private def updateDatamap(rchild: ModularPlan, subsume: ModularPlan) = { + val update = rchild match { + case s: Select if s.dataMapTableRelation.isDefined => + true + case g: GroupBy if g.dataMapTableRelation.isDefined => + true + case _ => false + } + + if (update) { + subsume match { + case s: Select => + s.copy(children = Seq(rchild)) + + case g: GroupBy => + g.copy(child = rchild) + case _ => subsume + } + } else { + subsume + } + } + + // add Select operator as placeholder on top of subsumee to facilitate matching + def unifySubsumee(subsumee: ModularPlan): Option[ModularPlan] = { + subsumee match { + case gb @ modular.GroupBy(_, _, _, _, + modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _) => + Some( + Select(gb.outputList, gb.outputList, Nil, Map.empty, Nil, gb :: Nil, gb.flags, + gb.flagSpec, Seq.empty)) + case other => Some(other) + } + } + + // add Select operator as placeholder on top of subsumer to facilitate matching + def unifySubsumer1( + subsumer: ModularPlan, + subsumee: ModularPlan, + dataMapRelation: ModularPlan): ModularPlan = { + // Update datamap table relation to the subsumer modular plan + val updatedSubsumer = subsumer match { + case s: Select => s.copy(dataMapTableRelation = Some(dataMapRelation)) + case g: GroupBy => g.copy(dataMapTableRelation = Some(dataMapRelation)) + case other => other + } + (updatedSubsumer, subsumee) match { + case (r @ + modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _), + modular.Select(_, _, _, _, _, + Seq(modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), + _, _, _, _) + ) => + modular.Select( + r.outputList, r.outputList, Nil, Map.empty, Nil, r :: Nil, r.flags, + r.flagSpec, Seq.empty).setSkip() + case _ => updatedSubsumer.setSkip() + } + } + + def unifySubsumer2(subsumer: ModularPlan, subsumee: ModularPlan): ModularPlan = { + val rtables = subsumer.collect { case n: modular.LeafNode => n } + val etables = subsumee.collect { case n: modular.LeafNode => n } + val pairs = for { + rtable <- rtables + etable <- etables + if (rtable == etable) + } yield (rtable, etable) + + pairs.foldLeft(subsumer) { + case (curSubsumer, pair) => + val nxtSubsumer = curSubsumer.transform { case pair._1 => pair._2 } + val attributeSet = AttributeSet(pair._1.output) + val rewrites = AttributeMap(pair._1.output.zip(pair._2.output)) + nxtSubsumer.transformUp { + case p => p.transformExpressions { + case a: Attribute if attributeSet contains a => rewrites(a).withQualifier(a.qualifier) + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala new file mode 100644 index 0000000..5039d66 --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.rewrite + +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.carbondata.mv.datamap.MVState +import org.apache.carbondata.mv.plans.modular.ModularPlan + +/** + * The primary workflow for rewriting relational queries using Spark libraries. + */ +class QueryRewrite private ( + state: MVState, + logical: LogicalPlan, + nextSubqueryId: AtomicLong) { + self => + + def this(state: MVState, logical: LogicalPlan) = + this(state, logical, new AtomicLong(0)) + + def newSubsumerName(): String = s"gen_subsumer_${nextSubqueryId.getAndIncrement()}" + + lazy val optimizedPlan: LogicalPlan = + state.optimizer.execute(logical) + + lazy val modularPlan: ModularPlan = + state.modularizer.modularize(optimizedPlan).next().harmonized + + lazy val withSummaryData: ModularPlan = + state.navigator.rewriteWithSummaryDatasets(modularPlan, self) + + lazy val toCompactSQL: String = withSummaryData.asCompactSQL + + lazy val toOneLineSQL: String = withSummaryData.asOneLineSQL +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala new file mode 100644 index 0000000..c29c08f --- /dev/null +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.mv.rewrite + +import java.util.concurrent.locks.ReentrantReadWriteLock + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.FindDataSourceTable +import org.apache.spark.sql.parser.CarbonSpark2SqlParser + +import org.apache.carbondata.core.datamap.DataMapCatalog +import org.apache.carbondata.core.datamap.status.DataMapStatusManager +import org.apache.carbondata.core.metadata.schema.table.DataMapSchema +import org.apache.carbondata.mv.datamap.{MVHelper, MVState} +import org.apache.carbondata.mv.plans.modular.{Flags, ModularPlan, ModularRelation, Select} +import org.apache.carbondata.mv.plans.util.Signature + +/** Holds a summary logical plan */ +private[mv] case class SummaryDataset(signature: Option[Signature], + plan: LogicalPlan, + dataMapSchema: DataMapSchema, + relation: ModularPlan) + +private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) + extends DataMapCatalog[SummaryDataset] { + + @transient + private val summaryDatasets = new scala.collection.mutable.ArrayBuffer[SummaryDataset] + + val mVState = new MVState(this) + + @transient + private val registerLock = new ReentrantReadWriteLock + + /** + * parser + */ + lazy val parser = new CarbonSpark2SqlParser + + /** Acquires a read lock on the catalog for the duration of `f`. */ + private def readLock[A](f: => A): A = { + val lock = registerLock.readLock() + lock.lock() + try f finally { + lock.unlock() + } + } + + /** Acquires a write lock on the catalog for the duration of `f`. */ + private def writeLock[A](f: => A): A = { + val lock = registerLock.writeLock() + lock.lock() + try f finally { + lock.unlock() + } + } + + /** Clears all summary tables. */ + private[mv] def refresh(): Unit = { + writeLock { + summaryDatasets.clear() + } + } + + /** Checks if the catalog is empty. */ + private[mv] def isEmpty: Boolean = { + readLock { + summaryDatasets.isEmpty + } + } + + /** + * Registers the data produced by the logical representation of the given [[DataFrame]]. Unlike + * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing + * the in-memory columnar representation of the underlying table is expensive. + */ + private[mv] def registerSchema(dataMapSchema: DataMapSchema): Unit = { + writeLock { + // TODO Add mvfunction here, don't use preagg function + val updatedQuery = parser.addPreAggFunction(dataMapSchema.getCtasQuery) + val query = sparkSession.sql(updatedQuery) + val planToRegister = MVHelper.dropDummFuc(query.queryExecution.analyzed) + val modularPlan = mVState.modularizer.modularize(mVState.optimizer.execute(planToRegister)) + .next() + .harmonized + val signature = modularPlan.signature + val identifier = dataMapSchema.getRelationIdentifier + val output = new FindDataSourceTable(sparkSession).apply(sparkSession.sessionState.catalog + .lookupRelation(TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName)))) + .output + val relation = ModularRelation(identifier.getDatabaseName, + identifier.getTableName, + output, + Flags.NoFlags, + Seq.empty) + val select = Select(relation.outputList, + relation.outputList, + Seq.empty, + Seq((0, identifier.getTableName)).toMap, + Seq.empty, + Seq(relation), + Flags.NoFlags, + Seq.empty, + Seq.empty, + None) + + summaryDatasets += SummaryDataset(signature, planToRegister, dataMapSchema, select) + } + } + + /** Removes the given [[DataFrame]] from the catalog */ + private[mv] def unregisterSchema(dataMapName: String): Unit = { + writeLock { + val dataIndex = summaryDatasets + .indexWhere(sd => sd.dataMapSchema.getDataMapName.equals(dataMapName)) + require(dataIndex >= 0, s"Datamap $dataMapName is not registered.") + summaryDatasets.remove(dataIndex) + } + } + + + override def listAllSchema(): Array[SummaryDataset] = summaryDatasets.toArray + + /** Returns feasible registered summary data sets for processing the given ModularPlan. */ + private[mv] def lookupFeasibleSummaryDatasets(plan: ModularPlan): Seq[SummaryDataset] = { + readLock { + val sig = plan.signature + val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails + // Only select the enabled datamaps for the query. + val enabledDataSets = summaryDatasets.filter{p => + statusDetails.exists(_.getDataMapName.equalsIgnoreCase(p.dataMapSchema.getDataMapName)) + } + val feasible = enabledDataSets.filter { x => + (x.signature, sig) match { + case (Some(sig1), Some(sig2)) => + if (sig1.groupby && sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) { + true + } else if (!sig1.groupby && !sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) { + true + } else { + false + } + + case _ => false + } + } + // heuristics: more tables involved in a summary data set => higher query reduction factor + feasible.sortWith(_.signature.get.datasets.size > _.signature.get.datasets.size) + } + } +}