Github user jackylk commented on a diff in the pull request: https://github.com/apache/carbondata/pull/1542#discussion_r153995074 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala --- @@ -822,33 +823,59 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi * @return */ private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = { + val validExpressionsMap = scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression] logicalPlan transform { case aggregate@Aggregate(_, aExp, _) => - val newExpressions = aExp.flatMap { - case alias@Alias(attrExpression: AggregateExpression, _) => - attrExpression.aggregateFunction match { - case Average(attr: AttributeReference) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(attr), - resultId = NamedExpression.newExprId), attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(attr), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case Average(cast@MatchCast(attr: AttributeReference, _)) => - Seq(Alias(attrExpression - .copy(aggregateFunction = Sum(cast), - resultId = NamedExpression.newExprId), - attr.name + "_sum")(), - Alias(attrExpression - .copy(aggregateFunction = Count(cast), - resultId = NamedExpression.newExprId), attr.name + "_count")()) - case _ => Seq(alias) - } - case namedExpr: NamedExpression => Seq(namedExpr) + aExp.foreach { + case alias: Alias => + validExpressionsMap ++= validateAggregateFunctionAndGetAlias(alias) + case namedExpr: NamedExpression => validExpressionsMap.put(namedExpr.name, namedExpr) } - aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]]) + aggregate + .copy(aggregateExpressions = validExpressionsMap.values.toSeq) --- End diff -- move this to previous line
---