Github user ravipesala commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1728#discussion_r159443569 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + SortOrder(newExpression, order.direction) + } else { + SortOrder(attr, order.direction) } - carbonTable + } + } + } - // When plan has grouping expression, aggregate expression - // logical relation - case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable - // case for handling aggregation, order by - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - } - carbonTable - // case for handling aggregation, order by and filter - case Project(projectList, - Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) + /** + * Below method will be used to update the expression like group by expression + * @param expressions + * sequence of expression like group by + * @return updated expressions + */ + def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = { + val newExp = expressions map { expression => + expression transform { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExpression + } else { + attr } - if (isValidPlan) { - list ++ - extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName) - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + } + } + newExp + } + + /** + * Below method will be used to updated the named expression like aggregate expression + * @param namedExpression + * any named expression like aggregate expression + * @return updated named expression + */ + def updateNamedExpression(namedExpression: Seq[NamedExpression]) : Seq[NamedExpression] = { + namedExpression map { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + case alias@Alias(exp, name) => + val newExp = exp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if (childExp.isDefined) { + val newExp = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) + newExp + } else { + attr + } + } + Alias(newExp, name)(alias.exprId, alias.qualifier, Some(alias.metadata), alias.isGenerated) + } + } + + /** + * Below method will be used to updated condition expression + * @param conditionExp + * any condition expression join condition or filter condition + * @return updated condition expression + */ + def updateConditionExpression(conditionExp: Option[Expression]): Option[Expression] = { + if (conditionExp.isDefined) { + val filterExp = conditionExp.get + Some(filterExp.transform { + case attr: AttributeReference => + val childExp = updatedExpression.find(p => p._1.sameRef(attr)) + if(childExp.isDefined) { + childExp.get._2 + } else { + attr } - carbonTable - // case for handling aggregation with order by when only projection column exits - case Sort(sortOrders, - _, - Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, + }) + } else { + conditionExp + } + } + + /** + * Below method will be used to validate and transform the main table plan to child table plan + * rules for transforming is as below. + * 1. Grouping expression rules + * 1.1 Change the parent attribute reference for of group expression + * to child attribute reference + * + * 2. Aggregate expression rules + * 2.1 Change the parent attribute reference for of group expression to + * child attribute reference + * 2.2 Change the count AggregateExpression to Sum as count + * is already calculated so in case of aggregate table + * we need to apply sum to get the count + * 2.2 In case of average aggregate function select 2 columns from aggregate table with + * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)). + * Note: During aggregate table creation for average table will be created with two columns + * one for sum(column) and count(column) to support rollup + * 3. Filter Expression rules. + * 3.1 Updated filter expression attributes with child table attributes + * 4. Update the Parent Logical relation with child Logical relation + * 5. timeseries function + * 5.1 validate parent table has timeseries datamap + * 5.2 timeseries function is valid function or not + * + * @param logicalPlan + * parent logical plan + * @return transformed plan + */ + def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = { + val updatedPlan = logicalPlan.transform { + // case for aggregation query + case agg@Aggregate(grExp, + aggExp, + child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) --- End diff -- Indentation is wrong
---