Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159609322
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
    @@ -1218,112 +1180,125 @@ case class 
CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
        * parent column name
        * @param carbonTable
        * parent carbon table
    -   * @param tableName
    -   * parent table name
    -   * @param aggFunction
    -   * aggregate function applied
    -   * @param dataType
    -   * data type of the column
    -   * @param isChangedDataType
    -   * is cast is applied on column
        * @param isFilterColumn
        * is filter is applied on column
        * @return query column
        */
       def getQueryColumn(columnName: String,
           carbonTable: CarbonTable,
    -      tableName: String,
    -      aggFunction: String = "",
    -      dataType: String = "",
    -      isChangedDataType: Boolean = false,
           isFilterColumn: Boolean = false,
           timeseriesFunction: String = ""): QueryColumn = {
    -    val columnSchema = carbonTable.getColumnByName(tableName, 
columnName.toLowerCase)
    +    val columnSchema = 
carbonTable.getColumnByName(carbonTable.getTableName, columnName.toLowerCase)
         if(null == columnSchema) {
           null
         } else {
    -      if (isChangedDataType) {
             new QueryColumn(columnSchema.getColumnSchema,
    -          columnSchema.getDataType.getName,
    -          aggFunction.toLowerCase,
               isFilterColumn,
               timeseriesFunction.toLowerCase)
    -      } else {
    -        new QueryColumn(columnSchema.getColumnSchema,
    -          CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
    -          aggFunction.toLowerCase,
    -          isFilterColumn,
    -          timeseriesFunction.toLowerCase)
    -      }
         }
       }
     }
     
    -object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
    -
    +/**
    + * Data loading rule class to validate and update the data loading query 
plan
    + * Validation rule:
    + * 1. update the avg aggregate expression with two columns sum and count
    + * 2. Remove duplicate sum and count expression if already there in plan
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateDataLoadingRules(sparkSession: SparkSession)
    +  extends Rule[LogicalPlan] {
    +  lazy val parser = new CarbonSpark2SqlParser
       override def apply(plan: LogicalPlan): LogicalPlan = {
    -    val validExpressionsMap = 
scala.collection.mutable.LinkedHashMap.empty[String, NamedExpression]
    +    val validExpressionsMap = 
scala.collection.mutable.HashSet.empty[AggExpToColumnMappingModel]
    +    val namedExpressionList = 
scala.collection.mutable.ListBuffer.empty[NamedExpression]
         plan transform {
    -      case aggregate@Aggregate(_, aExp, _) if 
validateAggregateExpressions(aExp) =>
    +      case aggregate@Aggregate(_,
    +      aExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        if validateAggregateExpressions(aExp) &&
    +        
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
    +        val carbonTable = 
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
    +          .carbonTable
             aExp.foreach {
    -          case alias: Alias =>
    -            validExpressionsMap ++= 
validateAggregateFunctionAndGetAlias(alias)
    -          case _: UnresolvedAlias =>
    -          case namedExpr: NamedExpression => 
validExpressionsMap.put(namedExpr.name, namedExpr)
    +          case attr: AttributeReference =>
    +              namedExpressionList += attr
    +            case alias@Alias(_: AttributeReference, _) =>
    +              namedExpressionList += alias
    +            case alias@Alias(aggExp: AggregateExpression, name) =>
    +              // get the updated expression for avg convert it to two 
expression
    +              // sum and count
    +              val expressions = 
PreAggregateUtil.getUpdateAggregateExpressions(aggExp)
    +              // if size is more than one then it was for average
    +              if(expressions.size > 1) {
    +                // get the logical plan for sum expression
    +                val logicalPlan_sum = 
PreAggregateUtil.getLogicalPlanFromAggExp(
    +                  expressions.head,
    +                  carbonTable.getTableName,
    +                  carbonTable.getDatabaseName,
    +                  logicalRelation,
    +                  sparkSession,
    +                  parser)
    +                // get the logical plan fro count expression
    +                val logicalPlan_count = 
PreAggregateUtil.getLogicalPlanFromAggExp(
    +                  expressions.last,
    +                  carbonTable.getTableName,
    +                  carbonTable.getDatabaseName,
    +                  logicalRelation,
    +                  sparkSession,
    +                  parser)
    +                // check with same expression already sum is present then 
do not add to
    +                // named expression list otherwise update the list and add 
it to set
    +                if 
(!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_sum))) {
    +                  namedExpressionList +=
    +                  Alias(expressions.head, name + " _ 
sum")(NamedExpression.newExprId,
    +                    alias.qualifier,
    +                    Some(alias.metadata),
    +                    alias.isGenerated)
    +                  validExpressionsMap += 
AggExpToColumnMappingModel(logicalPlan_sum)
    +                }
    +                // check with same expression already count is present 
then do not add to
    +                // named expression list otherwise update the list and add 
it to set
    +                if 
(!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan_count))) {
    +                  namedExpressionList +=
    +                  Alias(expressions.last, name + " _ 
count")(NamedExpression.newExprId,
    +                    alias.qualifier,
    +                    Some(alias.metadata),
    +                    alias.isGenerated)
    +                  validExpressionsMap += 
AggExpToColumnMappingModel(logicalPlan_count)
    +                }
    +              } else {
    +                // get the logical plan for expression
    +                val logicalPlan = 
PreAggregateUtil.getLogicalPlanFromAggExp(
    +                  expressions.head,
    +                  carbonTable.getTableName,
    +                  carbonTable.getDatabaseName,
    +                  logicalRelation,
    +                  sparkSession,
    +                  parser)
    +                // check with same expression already  present then do not 
add to
    +                // named expression list otherwise update the list and add 
it to set
    +                if 
(!validExpressionsMap.contains(AggExpToColumnMappingModel(logicalPlan))) {
    +                  namedExpressionList+=alias
    +                  validExpressionsMap += 
AggExpToColumnMappingModel(logicalPlan)
    +                }
    +              }
    +            case alias@Alias(_: Expression, _) =>
    +              namedExpressionList += alias
             }
    -        aggregate.copy(aggregateExpressions = 
validExpressionsMap.values.toSeq)
    +        aggregate.copy(aggregateExpressions = namedExpressionList.toSeq)
           case plan: LogicalPlan => plan
         }
       }
     
    -    /**
    -     * This method will split the avg column into sum and count and will 
return a sequence of tuple
    -     * of unique name, alias
    -     *
    -     */
    -    private def validateAggregateFunctionAndGetAlias(alias: Alias): 
Seq[(String,
    -      NamedExpression)] = {
    -      alias match {
    -        case udf@Alias(_: ScalaUDF, name) =>
    -          Seq((name, udf))
    -        case alias@Alias(attrExpression: AggregateExpression, _) =>
    -          attrExpression.aggregateFunction match {
    -            case Sum(attr: AttributeReference) =>
    -              (attr.name + "_sum", alias) :: Nil
    -            case Sum(MatchCastExpression(attr: AttributeReference, _)) =>
    -              (attr.name + "_sum", alias) :: Nil
    -            case Count(Seq(attr: AttributeReference)) =>
    -              (attr.name + "_count", alias) :: Nil
    -            case Count(Seq(MatchCastExpression(attr: AttributeReference, 
_))) =>
    -              (attr.name + "_count", alias) :: Nil
    -            case Average(attr: AttributeReference) =>
    -              Seq((attr.name + "_sum", Alias(attrExpression.
    -                copy(aggregateFunction = Sum(attr),
    -                  resultId = NamedExpression.newExprId), attr.name + 
"_sum")()),
    -                (attr.name, Alias(attrExpression.
    -                  copy(aggregateFunction = Count(attr),
    -                    resultId = NamedExpression.newExprId), attr.name + 
"_count")()))
    -            case Average(cast@MatchCastExpression(attr: 
AttributeReference, _)) =>
    -              Seq((attr.name + "_sum", Alias(attrExpression.
    -                copy(aggregateFunction = Sum(cast),
    -                  resultId = NamedExpression.newExprId),
    -                attr.name + "_sum")()),
    -                (attr.name, Alias(attrExpression.
    -                  copy(aggregateFunction = Count(cast), resultId =
    -                    NamedExpression.newExprId), attr.name + "_count")()))
    -            case _ => Seq(("", alias))
    -          }
    -
    -      }
    -    }
    -
       /**
        * Called by PreAggregateLoadingRules to validate if plan is valid for 
applying rules or not.
        * If the plan has PreAggLoad i.e Loading UDF and does not have PreAgg 
i.e Query UDF then it is
        * valid.
    -   *
        * @param namedExpression
    -   * @return
    +   * named expressions
    --- End diff --
    
    move it up. comment should like
    `@param namedExpression named expressions`


---

Reply via email to