Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1464#discussion_r149004253 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala --- @@ -0,0 +1,756 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Divide, +Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.types._ + +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} +import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * Class for applying Pre Aggregate rules + * Responsibility. + * 1. Check plan is valid plan for updating the parent table plan with child table + * 2. Updated the plan based on child schema + * + * Rules for Upadating the plan + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation + * sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * + * @param sparkSession + * spark session + */ +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] { + + override def apply(plan: LogicalPlan): LogicalPlan = { + var needAnalysis = true + plan.transformExpressions { + // first check if any preAgg scala function is applied it is present is in plan + // then call is from create preaggregate table class so no need to transform the query plan + case al@Alias(_, name) if name.equals("preAgg") => + needAnalysis = false + al + // in case of query if any unresolve alias is present then wait for plan to be resolved + // return the same plan as we can tranform the plan only when everything is resolved + case unresolveAlias@UnresolvedAlias(_, _) => + needAnalysis = false + unresolveAlias + } + // if plan is not valid for transformation then return same plan + if (!needAnalysis) { + plan + } else { + // create buffer to collect all the column and its metadata information + val list = scala.collection.mutable.ListBuffer.empty[QueryColumn] + var isValidPlan = true + val carbonTable = plan match { + // matching the plan based on supported plan + // if plan is matches with any case it will validate and get all + // information required for transforming the plan + + // When plan has grouping expression, aggregate expression + // subquery + case Aggregate(groupingExp, + aggregateExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + } + carbonTable + + // below case for handling filter query + // When plan has grouping expression, aggregate expression + // filter expression + case Aggregate(groupingExp, aggregateExp, + Filter(filterExp, + SubqueryAlias(_, logicalRelation: LogicalRelation, _))) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + // getting the columns from filter expression + filterExp.transform { + case attr: AttributeReference => + list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true) + attr + } + } + carbonTable + + // When plan has grouping expression, aggregate expression + // logical relation + case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] + && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable + .hasPreAggDataMap => + val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) + // when table has child data map(pre aggregate table) then only plan will be transformed + if (!carbonTable.hasPreAggDataMap) { + isValidPlan = false + } + if (isValidPlan) { + // if it is valid plan then extract the query columns + isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, + aggregateExp, + carbonTable, + tableName, + list) + } + carbonTable + case _ => + isValidPlan = false + null + } + // if plan is valid then update the plan with child attributes + if (isValidPlan) { + // getting all the projection columns + val listProjectionColumn = list + .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn) + // getting all the filter columns + val listFilterColumn = list + .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn) + // getting all the aggregation columns + val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty) + // create a query plan object which will be used to select the list of pre aggregate tables + // matches with this plan + val queryPlan = new QueryPlan(listProjectionColumn.asJava, + listAggregationColumn.asJava, + listFilterColumn.asJava) + // create aggregate table selector object + val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable) + // select the list of valid child tables + val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema() + // if it doesnot match with any pre aggregate table return the same plan + if (!selectedDataMapSchemas.isEmpty) { + // sort the selected child schema based on size to select smallest pre aggregate table + val (aggDataMapSchema, carbonRelation) = + selectedDataMapSchemas.asScala.map { selectedDataMapSchema => + val catalog = sparkSession.sessionState.catalog + val carbonRelation = catalog + .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier + .getTableName, + Some(selectedDataMapSchema.getRelationIdentifier + .getDatabaseName))).asInstanceOf[SubqueryAlias].child + .asInstanceOf[LogicalRelation] + (selectedDataMapSchema, carbonRelation) + }.sortBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes) + .head --- End diff -- Instead of using `}.sortBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes).head` use `.minBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)`
---