Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159052141
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
    @@ -330,6 +207,264 @@ case class CarbonPreAggregateQueryRules(sparkSession: 
SparkSession) extends Rule
         }
       }
     
    +  /**
    +   * Below method will be used to validate the logical plan
    +   * and get all the details from to select proper aggregate table
    +   * @param logicalPlan
    +   *                    actual query logical plan
    +   * @param list
    +   *             list of projection column present in plan
    +   * @param qAggExprs
    +   *                  list of aggregate expression
    +   * @return if plan is valid for tranformation, parent table, parent 
logical relaion
    +   */
    +  def validatePlanAndGetFields(logicalPlan: LogicalPlan,
    +      list: scala.collection.mutable.HashSet[QueryColumn],
    +      qAggExprs: scala.collection.mutable.HashSet[AggregateExpression]): 
(Boolean,
    +    CarbonTable, LogicalRelation) = {
    +    var isValidPlan = false
    +    var pTable: CarbonTable = null
    +    var qLRelation: LogicalRelation = null
    +    logicalPlan.transform {
    +      // to handle filter expression
    +      case filter@Filter(filterExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical 
relation is
    +        // is for carbon
    +        if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        // getting the columns from filter expression
    +        if (!CarbonReflectionUtils.hasPredicateSubquery(filterExp)) {
    +          isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, 
pTable)
    +        }
    +        filter
    +        // to handle aggregate expression
    +      case agg@Aggregate(groupingExp,
    +      aggregateExp,
    +      CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        // only carbon query plan is supported checking whether logical 
relation is
    +        // is for carbon
    +        if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        qLRelation = logicalRelation
    +        pTable = getCarbonTableAndTableName(logicalRelation)
    +        isValidPlan = extractQueryColumnsFromAggExpression(
    +          groupingExp,
    +          aggregateExp,
    +          pTable,
    +          list,
    +          qAggExprs)
    +        agg
    +        // to handle aggregate expression with filter
    +      case agg@Aggregate(grExp, aggExp, filter@Filter(_, _)) =>
    +        val out = validatePlanAndGetFields(filter, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if (isValidPlan) {
    +          isValidPlan = extractQueryColumnsFromAggExpression(grExp, 
aggExp, pTable, list, qAggExprs)
    +        }
    +        agg
    +        // to handle projection with order by
    +      case proj@Project(projectList, sort@Sort(_, _, _)) =>
    +        val out = validatePlanAndGetFields(sort, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), 
Seq.empty, pTable)
    +        }
    +        proj
    +        // to handle only projection
    +      case proj@Project(projectList, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++ extractQueryColumnForOrderBy(Some(projectList), 
Seq.empty, pTable)
    +        }
    +        proj
    +      // case for handling aggregation with order by when only projection 
column exits
    +      case sort@Sort(sortOrders, _, agg@Aggregate(_, _, _)) =>
    +        val out = validatePlanAndGetFields(agg, list, qAggExprs)
    +        pTable = out._2
    +        qLRelation = out._3
    +        isValidPlan = out._1
    +        if(isValidPlan) {
    +          list ++
    +          extractQueryColumnForOrderBy(None, sortOrders, pTable)
    +        }
    +        sort
    +    }
    +    (isValidPlan, pTable, qLRelation)
    +  }
    +
    +  /**
    +   * Below method will be used to validate aggregate expression with the 
data map
    +   * and will return the selected valid data maps
    +   * @param selectedDataMap
    +   *                        list of data maps
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param logicalRelation
    +   *                        parent logical relation
    +   * @param queryAggExpLogicalPlans
    +   *                                query agg expression logical plan
    +   * @return valid data map
    +   */
    +  def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation,
    +      queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
    +    def validateDataMap(dataMap: DataMapSchema,
    +        aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
    +      val schemaAggLogicalPlan = 
getLogicalPlanForAggregateExpression(dataMap,
    +        carbonTable,
    +        logicalRelation)
    +      aggExpLogicalPlans.forall{
    +        p => schemaAggLogicalPlan.exists(m => p.sameResult(m._1))
    +      }
    +    }
    +    val selectedDataMapSchema = selectedDataMap.collect {
    +      case dataMap if validateDataMap(dataMap, queryAggExpLogicalPlans) =>
    +        dataMap
    +    }
    +    selectedDataMapSchema
    +  }
    +
    +  /**
    +   * Below method will be used to update the logical plan of expression
    +   * with parent table logical relation
    +   * @param logicalPlan
    +   * @param logicalRelation
    +   * @return
    +   */
    +  def updateLogicalRelation(logicalPlan: LogicalPlan,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    logicalPlan transform {
    +      case l: LogicalRelation =>
    +        l.copy(relation = logicalRelation.relation)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to to get the logical plan for each 
aggregate expression in
    +   * child data map and its column schema mapping if mapping is already 
present
    +   * then it will use the same otherwise it will generate and stored in 
aggregation data map
    +   * @param selectedDataMap
    +   *                        child data map
    +   * @param carbonTable
    +   *                    parent table
    +   * @param logicalRelation
    +   *                        logical relation of actual plan
    +   * @return map of logical plan for each aggregate expression in child 
query and its column mapping
    +   */
    +  def getLogicalPlanForAggregateExpression(selectedDataMap: DataMapSchema, 
carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation): Map[LogicalPlan, ColumnSchema] = {
    +    val aggDataMapSchema = 
selectedDataMap.asInstanceOf[AggregationDataMapSchema]
    +    // if column mapping is not present
    +    if (null == aggDataMapSchema.getAggregateExpressionToColumnMapping) {
    +      // add preAGG UDF to avoid all the PreAggregate rule
    +      val childDataMapQueryString = new CarbonSpark2SqlParser()
    +        
.addPreAggFunction(aggDataMapSchema.getProperties.get("CHILD_SELECT QUERY"))
    +      // get the logical plan
    +      val aggPlan = sparkSession.sql(childDataMapQueryString).logicalPlan
    +      // getting all aggregate expression from query
    +      val dataMapAggExp = getAggregateExpFromChildDataMap(aggPlan)
    +      // in case of average child table will have two columns which will 
be stored in sequence
    +      // so for average expression we need to get two columns for mapping
    +      var counter = 0
    +      // sorting the columns based on schema ordinal so search will give 
proper result
    +      val sortedColumnList = 
aggDataMapSchema.getChildSchema.getListOfColumns.asScala
    +        .sortBy(_.getSchemaOrdinal)
    +      val logicalPlanToColumnMapping = dataMapAggExp.map { aggExp =>
    +        // for each aggregate expression get logical plan
    +        val expLogicalPlan = getLogicalPlanFromAggExp(aggExp,
    +          carbonTable.getTableName,
    +          carbonTable.getDatabaseName, logicalRelation)
    +        // check if aggregate expression is of type avg
    +        // get the columns
    +        var columnSchema = aggDataMapSchema
    +          .getAggColumnBasedOnIndex(counter, sortedColumnList.asJava)
    +        // increment the counter so when for next expression above code 
will be
    +        // executed it will search from that schema ordinal
    +        counter = columnSchema.getSchemaOrdinal + 1
    +        (expLogicalPlan, columnSchema)
    +      }.toMap
    +      // store the mapping in data map
    +      
aggDataMapSchema.setAggregateExpressionToColumnMapping(logicalPlanToColumnMapping.asJava)
    +      // return the mapping
    +      logicalPlanToColumnMapping
    +    } else {
    +      // if already present in data map then return the same
    +      aggDataMapSchema.getAggregateExpressionToColumnMapping
    +        .asInstanceOf[java.util.Map[LogicalPlan, 
ColumnSchema]].asScala.toMap
    +    }
    +  }
    +
    +
    +  /**
    +   * Below method will be used to get the logical plan from aggregate 
expression
    +   * @param aggExp
    +   *               aggregate expression
    +   * @param tableName
    +   *                  parent table name
    +   * @param databaseName
    +   *                     database name
    +   * @param logicalRelation
    +   *                        logical relation
    +   * @return logical plan
    +   */
    +  def getLogicalPlanFromAggExp(aggExp: AggregateExpression,
    +      tableName: String,
    +      databaseName: String,
    +      logicalRelation: LogicalRelation): LogicalPlan = {
    +    // adding the preAGG UDF, so pre aggregate data loading rule and query 
rule will not
    +    // be applied
    +    val query = new CarbonSpark2SqlParser()
    +      .addPreAggFunction(s"Select ${ aggExp.sql } from 
$databaseName.$tableName")
    +    // updating the logical relation of logical plan to so when two 
logical plan
    +    // will be compared it will not consider relation
    +    updateLogicalRelation(sparkSession.sql(query).logicalPlan, 
logicalRelation)
    +  }
    +
    +  /**
    +   * Below method will be used to get aggregate expression
    +   * @param logicalPlan
    +   *               logical plan
    +   * @return list of aggregate expression
    +   */
    +  def getAggregateExpFromChildDataMap(logicalPlan: LogicalPlan): 
Seq[AggregateExpression] = {
    +    val list = scala.collection.mutable.HashSet.empty[AggregateExpression]
    --- End diff --
    
    Should use List or LinkedHasSet as the order of insertion is required for 
mapping


---

Reply via email to