Github user kumarvishal09 commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1728#discussion_r159457280 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -98,238 +143,499 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule needAnalysis = false attr } + if(needAnalysis) { + needAnalysis = isValidPlan(plan) + } // if plan is not valid for transformation then return same plan if (!needAnalysis) { plan } else { - // create buffer to collect all the column and its metadata information - val list = scala.collection.mutable.HashSet.empty[QueryColumn] - var isValidPlan = true - val carbonTable = plan match { - // matching the plan based on supported plan - // if plan is matches with any case it will validate and get all - // information required for transforming the plan + val updatedPlan = transformPreAggQueryPlan(plan) + val newPlan = updatePlan(updatedPlan) + print(newPlan.toString()) + newPlan + } + } - // When plan has grouping expression, aggregate expression - // subquery - case Aggregate(groupingExp, - aggregateExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - carbonTable + /** + * Below method will be used to update the child plan + * This will be used for updating expression like join condition, + * order by, project list etc + * @param plan + * child plan + * @return updated plan + */ + def updatePlan(plan: LogicalPlan) : LogicalPlan = { + val updatedPlan = plan transform { + case Aggregate(grp, aggExp, child) => + Aggregate(updateExpression(grp), updateNamedExpression(aggExp), child) + case Filter(filterExp, child) => + Filter(updateConditionExpression(Some(filterExp)).get, child) + case Project(projectList, child) => + Project(updateNamedExpression(projectList), child) + case Sort(sortOrders, global, child) => + Sort(updateSortExpression(sortOrders), global, child) + case Join(left, right, joinType, condition) => + Join(left, right, joinType, updateConditionExpression(condition)) + } + updatedPlan + } - // below case for handling filter query - // When plan has grouping expression, aggregate expression - // filter expression - case Aggregate(groupingExp, - aggregateExp, - Filter(filterExp, - CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))) - // only carbon query plan is supported checking whether logical relation is - // is for carbon - if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && - logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation. - metaData.hasAggregateDataMapSchema => - val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation) - // if it is valid plan then extract the query columns - isValidPlan = extractQueryColumnsFromAggExpression(groupingExp, - aggregateExp, - carbonTable, - tableName, - list) - if(isValidPlan) { - isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp) - } - // getting the columns from filter expression - if(isValidPlan) { - isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName) + /** + * Below method will be used to update the sort expression + * @param sortExp + * sort order expression in query + * @return updated sort expression + */ + def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = { + sortExp map { order => + order.child match { + case attr: AttributeReference => + val childExp = updatedExpression.find { p => p._1.sameRef(attr) } + if (childExp.isDefined) { + val newExpression = AttributeReference( + childExp.get._2.name, + childExp.get._2.dataType, + childExp.get._2.nullable, + childExp.get._2.metadata)(childExp.get._2.exprId, attr.qualifier, attr.isGenerated) --- End diff -- childExp.qualifier will not have table alias name in case of join we need to table alias name
---