Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159458812
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
    @@ -98,238 +143,499 @@ case class 
CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata 
information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical 
relation is
    -          // is for carbon
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), 
child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical 
relation is
    -          // is for carbon
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = 
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, 
carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, 
attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: 
LogicalRelation)
    -          // only carbon query plan is supported checking whether logical 
relation is
    -          // is for carbon
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, 
carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: 
LogicalRelation)))))
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = 
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by 
expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, 
attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, 
carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, 
carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like 
aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : 
Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, 
attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, 
attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, 
Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): 
Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only 
projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table 
plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression 
to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from 
aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), 
sum(column with count)).
    +   * Note: During aggregate table creation for average table will be 
created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table 
attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        val carbonTable = getCarbonTable(logicalRelation)
    +        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    +        val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
    +        val isValidPlan = extractQueryColumnsFromAggExpression(
    +          grExp,
    +          aggExp,
    +          carbonTable,
    +          list,
    +          aggregateExpressions)
    +        if(isValidPlan) {
    +          val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
    +            aggregateExpressions,
                 carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
    -              carbonTable = carbonTable,
    -              tableName = tableName)
    +            logicalRelation)
    +          if(null != aggDataMapSchema && null!= childPlan) {
    +            val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
    +            val (updatedGroupExp, updatedAggExp, newChild, None) =
    +              getUpdatedExpressions(grExp,
    +                aggExp,
    +                child,
    +                None,
    +                aggDataMapSchema,
    +                attributes,
    +                childPlan,
    +                carbonTable,
    +                logicalRelation)
    +            Aggregate(updatedGroupExp,
    +              updatedAggExp,
    +              newChild)
    +          } else {
    +            agg
               }
    -          carbonTable
    -        // case for handling aggregation with order by and filter when 
only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: 
LogicalRelation))))
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +        } else {
    +          agg
    +        }
    +      // case of handling aggregation query with filter
    +      case agg@Aggregate(grExp,
    --- End diff --
    
    How about with Sort ?


---

Reply via email to