Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1728#discussion_r159052141 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + /** + * Below method will be used to validate the logical plan + * and get all the details from to select proper aggregate table + * @param logicalPlan + * actual query logical plan + * @param list + * list of projection column present in plan + * @param qAggExprs + * list of aggregate expression + * @return if plan is valid for tranformation, parent table, parent logical relaion + */ + def validatePlanAndGetFields(logicalPlan: LogicalPlan, + list: scala.collection.mutable.HashSet[QueryColumn], + qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): (Boolean, + CarbonTable, LogicalRelation) = { + var isValidPlan = false + var pTable: CarbonTable = null + var qLRelation: LogicalRelation = null + logicalPlan.transform { + // to handle filter expression + case filter@Filter(filterExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + qLRelation = logicalRelation + pTable = getCarbonTableAndTableName(logicalRelation) + // getting the columns from filter expression + if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) { + isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, pTable) + } + filter + // to handle aggregate expression + case agg@Aggregate(groupingExp, + aggregateExp, + CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) + // only carbon query plan is supported checking whether logical relation is + // is for carbon + if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && + logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. + metaData.hasAggregateDataMapSchema => + qLRelation = logicalRelation + pTable = getCarbonTableAndTableName(logicalRelation) + isValidPlan = extractQueryColumnsFromAggExpression( + groupingExp, + aggregateExp, + pTable, + list, + qAggExprs) + agg + // to handle aggregate expression with filter + case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) => + val out = validatePlanAndGetFields(filter, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if (isValidPlan) { + isValidPlan = extractQueryColumnsFromAggExpression(grExp, aggExp, pTable, list, qAggExprs) + } + agg + // to handle projection with order by + case proj@Project(projectList, sort@Sort(_, _, _)) => + val out = validatePlanAndGetFields(sort, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if(isValidPlan) { + list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable) + } + proj + // to handle only projection + case proj@Project(projectList, agg@Aggregate(_, _, _)) => + val out = validatePlanAndGetFields(agg, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if(isValidPlan) { + list ++ extractQueryColumnForOrderBy(Some(projectList), Seq.empty, pTable) + } + proj + // case for handling aggregation with order by when only projection column exits + case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) => + val out = validatePlanAndGetFields(agg, list, qAggExprs) + pTable = out._2 + qLRelation = out._3 + isValidPlan = out._1 + if(isValidPlan) { + list ++ + extractQueryColumnForOrderBy(None, sortOrders, pTable) + } + sort + } + (isValidPlan, pTable, qLRelation) + } + + /** + * Below method will be used to validate aggregate expression with the data map + * and will return the selected valid data maps + * @param selectedDataMap + * list of data maps + * @param carbonTable + * parent carbon table + * @param logicalRelation + * parent logical relation + * @param queryAggExpLogicalPlans + * query agg expression logical plan + * @return valid data map + */ + def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema], + carbonTable: CarbonTable, + logicalRelation: LogicalRelation, + queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = { + def validateDataMap(dataMap: DataMapSchema, + aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = { + val schemaAggLogicalPlan = getLogicalPlanForAggregateExpression(dataMap, + carbonTable, + logicalRelation) + aggExpLogicalPlans.forall{ + p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1)) + } + } + val selectedDataMapSchema = selectedDataMap.collect { + case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) => + dataMap + } + selectedDataMapSchema + } + + /** + * Below method will be used to update the logical plan of expression + * with parent table logical relation + * @param logicalPlan + * @param logicalRelation + * @return + */ + def updateLogicalRelation(logicalPlan: LogicalPlan, + logicalRelation: LogicalRelation): LogicalPlan = { + logicalPlan transform { + case l: LogicalRelation => + l.copy(relation = logicalRelation.relation) + } + } + + /** + * Below method will be used to to get the logical plan for each aggregate expression in + * child data map and its column schema mapping if mapping is already present + * then it will use the same otherwise it will generate and stored in aggregation data map + * @param selectedDataMap + * child data map + * @param carbonTable + * parent table + * @param logicalRelation + * logical relation of actual plan + * @return map of logical plan for each aggregate expression in child query and its column mapping + */ + def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, carbonTable: CarbonTable, + logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = { + val aggDataMapSchema = selectedDataMap.asInstanceOf[AggregationDataMapSchema] + // if column mapping is not present + if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) { + // add preAGG UDF to avoid all the PreAggregate rule + val childDataMapQueryString = new CarbonSpark2SqlParser() + .addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY")) + // get the logical plan + val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan + // getting all aggregate expression from query + val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan) + // in case of average child table will have two columns which will be stored in sequence + // so for average expression we need to get two columns for mapping + var counter = 0 + // sorting the columns based on schema ordinal so search will give proper result + val sortedColumnList = aggDataMapSchema.getChildSchema.getListOfColumns.asScala + .sortBy(_.getSchemaOrdinal) + val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp => + // for each aggregate expression get logical plan + val expLogicalPlan = getLogicalPlanFromAggExp(aggExp, + carbonTable.getTableName, + carbonTable.getDatabaseName, logicalRelation) + // check if aggregate expression is of type avg + // get the columns + var columnSchema = aggDataMapSchema + .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava) + // increment the counter so when for next expression above code will be + // executed it will search from that schema ordinal + counter = columnSchema.getSchemaOrdinal + 1 + (expLogicalPlan, columnSchema) + }.toMap + // store the mapping in data map + aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava) + // return the mapping + logicalPlanToColumnMapping + } else { + // if already present in data map then return the same + aggDataMapSchema.getAggregateExpressionToColumnMapping + .asInstanceOf[java.util.Map[LogicalPlan, ColumnSchema]].asScala.toMap + } + } + + + /** + * Below method will be used to get the logical plan from aggregate expression + * @param aggExp + * aggregate expression + * @param tableName + * parent table name + * @param databaseName + * database name + * @param logicalRelation + * logical relation + * @return logical plan + */ + def getLogicalPlanFromAggExp(aggExp: AggregateExpression, + tableName: String, + databaseName: String, + logicalRelation: LogicalRelation): LogicalPlan = { + // adding the preAGG UDF, so pre aggregate data loading rule and query rule will not + // be applied + val query = new CarbonSpark2SqlParser() + .addPreAggFunction(s"Select ${ aggExp.sql } from $databaseName.$tableName") + // updating the logical relation of logical plan to so when two logical plan + // will be compared it will not consider relation + updateLogicalRelation(sparkSession.sql(query).logicalPlan, logicalRelation) + } + + /** + * Below method will be used to get aggregate expression + * @param logicalPlan + * logical plan + * @return list of aggregate expression + */ + def getAggregateExpFromChildDataMap(logicalPlan: LogicalPlan): Seq[AggregateExpression] = { + val list = scala.collection.mutable.HashSet.empty[AggregateExpression] --- End diff -- Should use List or LinkedHasSet as the order of insertion is required for mapping
---