Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1728#discussion_r159459370
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
 ---
    @@ -98,238 +143,499 @@ case class 
CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             needAnalysis = false
             attr
         }
    +    if(needAnalysis) {
    +      needAnalysis = isValidPlan(plan)
    +    }
         // if plan is not valid for transformation then return same plan
         if (!needAnalysis) {
           plan
         } else {
    -      // create buffer to collect all the column and its metadata 
information
    -      val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    -      var isValidPlan = true
    -      val carbonTable = plan match {
    -        // matching the plan based on supported plan
    -        // if plan is matches with any case it will validate and get all
    -        // information required for transforming the plan
    +      val updatedPlan = transformPreAggQueryPlan(plan)
    +      val newPlan = updatePlan(updatedPlan)
    +      print(newPlan.toString())
    +      newPlan
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // subquery
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    -          // only carbon query plan is supported checking whether logical 
relation is
    -          // is for carbon
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    +  /**
    +   * Below method will be used to update the child plan
    +   * This will be used for updating expression like join condition,
    +   * order by, project list etc
    +   * @param plan
    +   * child plan
    +   * @return updated plan
    +   */
    +  def updatePlan(plan: LogicalPlan) : LogicalPlan = {
    +    val updatedPlan = plan transform {
    +      case Aggregate(grp, aggExp, child) =>
    +        Aggregate(updateExpression(grp), updateNamedExpression(aggExp), 
child)
    +      case Filter(filterExp, child) =>
    +        Filter(updateConditionExpression(Some(filterExp)).get, child)
    +      case Project(projectList, child) =>
    +        Project(updateNamedExpression(projectList), child)
    +      case Sort(sortOrders, global, child) =>
    +        Sort(updateSortExpression(sortOrders), global, child)
    +      case Join(left, right, joinType, condition) =>
    +        Join(left, right, joinType, updateConditionExpression(condition))
    +    }
    +    updatedPlan
    +  }
     
    -        // below case for handling filter query
    -        // When plan has grouping expression, aggregate expression
    -        // filter expression
    -        case Aggregate(groupingExp,
    -          aggregateExp,
    -          Filter(filterExp,
    -          CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          // only carbon query plan is supported checking whether logical 
relation is
    -          // is for carbon
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]   &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = 
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          // getting the columns from filter expression
    -          if(isValidPlan) {
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, 
carbonTable, tableName)
    +  /**
    +   * Below method will be used to update the sort expression
    +   * @param sortExp
    +   * sort order expression in query
    +   * @return updated sort expression
    +   */
    +  def updateSortExpression(sortExp : Seq[SortOrder]) : Seq[SortOrder] = {
    +     sortExp map { order =>
    +      order.child match {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, 
attr.qualifier, attr.isGenerated)
    +            SortOrder(newExpression, order.direction)
    +          } else {
    +            SortOrder(attr, order.direction)
               }
    -          carbonTable
    +      }
    +    }
    +  }
     
    -        // When plan has grouping expression, aggregate expression
    -        // logical relation
    -        case Aggregate(groupingExp, aggregateExp, logicalRelation: 
LogicalRelation)
    -          // only carbon query plan is supported checking whether logical 
relation is
    -          // is for carbon
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          // if it is valid plan then extract the query columns
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          carbonTable
    -        // case for handling aggregation, order by
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))))
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, 
carbonTable, tableName)
    -          }
    -          carbonTable
    -        // case for handling aggregation, order by and filter
    -        case Project(projectList,
    -          Sort(sortOrders,
    -            _,
    -            Aggregate(groupingExp,
    -              aggregateExp,
    -              Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: 
LogicalRelation)))))
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    -            carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = 
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    +  /**
    +   * Below method will be used to update the expression like group by 
expression
    +   * @param expressions
    +   * sequence of expression like group by
    +   * @return updated expressions
    +   */
    +  def updateExpression(expressions : Seq[Expression]) : Seq[Expression] = {
    +    val newExp = expressions map { expression =>
    +      expression transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find { p => p._1.sameRef(attr) }
    +          if (childExp.isDefined) {
    +            val newExpression = AttributeReference(
    +              childExp.get._2.name,
    +              childExp.get._2.dataType,
    +              childExp.get._2.nullable,
    +              childExp.get._2.metadata)(childExp.get._2.exprId, 
attr.qualifier, attr.isGenerated)
    +            newExpression
    +          } else {
    +            attr
               }
    -          if (isValidPlan) {
    -            list ++
    -            extractQueryColumnForOrderBy(Some(projectList), sortOrders, 
carbonTable, tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, 
carbonTable, tableName)
    +      }
    +    }
    +    newExp
    +  }
    +
    +  /**
    +   * Below method will be used to updated the named expression like 
aggregate expression
    +   * @param namedExpression
    +   * any named expression like aggregate expression
    +   * @return updated named expression
    +   */
    +  def updateNamedExpression(namedExpression: Seq[NamedExpression]) : 
Seq[NamedExpression] = {
    +    namedExpression map {
    +      case attr: AttributeReference =>
    +        val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +        if(childExp.isDefined) {
    +          val newExp = AttributeReference(
    +            childExp.get._2.name,
    +            childExp.get._2.dataType,
    +            childExp.get._2.nullable,
    +            childExp.get._2.metadata)(childExp.get._2.exprId, 
attr.qualifier, attr.isGenerated)
    +          newExp
    +        } else {
    +          attr
    +        }
    +      case alias@Alias(exp, name) =>
    +        val newExp = exp.transform {
    +          case attr: AttributeReference =>
    +            val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +            if (childExp.isDefined) {
    +              val newExp = AttributeReference(
    +                childExp.get._2.name,
    +                childExp.get._2.dataType,
    +                childExp.get._2.nullable,
    +                childExp.get._2.metadata)(childExp.get._2.exprId, 
attr.qualifier, attr.isGenerated)
    +              newExp
    +            } else {
    +              attr
    +            }
    +        }
    +        Alias(newExp, name)(alias.exprId, alias.qualifier, 
Some(alias.metadata), alias.isGenerated)
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to updated condition expression
    +   * @param conditionExp
    +   * any condition expression join condition or filter condition
    +   * @return updated condition expression
    +   */
    +  def updateConditionExpression(conditionExp: Option[Expression]): 
Option[Expression] = {
    +    if (conditionExp.isDefined) {
    +      val filterExp = conditionExp.get
    +      Some(filterExp.transform {
    +        case attr: AttributeReference =>
    +          val childExp = updatedExpression.find(p => p._1.sameRef(attr))
    +          if(childExp.isDefined) {
    +            childExp.get._2
    +          } else {
    +            attr
               }
    -          carbonTable
    -        // case for handling aggregation with order by when only 
projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            CarbonSubqueryAlias(_, logicalRelation: LogicalRelation)))
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +      })
    +    } else {
    +      conditionExp
    +    }
    +  }
    +
    +  /**
    +   * Below method will be used to validate and transform the main table 
plan to child table plan
    +   * rules for transforming is as below.
    +   * 1. Grouping expression rules
    +   *    1.1 Change the parent attribute reference for of group expression
    +   * to child attribute reference
    +   *
    +   * 2. Aggregate expression rules
    +   *    2.1 Change the parent attribute reference for of group expression 
to
    +   * child attribute reference
    +   *    2.2 Change the count AggregateExpression to Sum as count
    +   * is already calculated so in case of aggregate table
    +   * we need to apply sum to get the count
    +   *    2.2 In case of average aggregate function select 2 columns from 
aggregate table with
    +   * aggregation sum and count. Then add divide(sum(column with sum), 
sum(column with count)).
    +   * Note: During aggregate table creation for average table will be 
created with two columns
    +   * one for sum(column) and count(column) to support rollup
    +   * 3. Filter Expression rules.
    +   *    3.1 Updated filter expression attributes with child table 
attributes
    +   * 4. Update the Parent Logical relation with child Logical relation
    +   * 5. timeseries function
    +   *    5.1 validate parent table has timeseries datamap
    +   *    5.2 timeseries function is valid function or not
    +   *
    +   * @param logicalPlan
    +   * parent logical plan
    +   * @return transformed plan
    +   */
    +  def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = {
    +    val updatedPlan = logicalPlan.transform {
    +      // case for aggregation query
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      child@CarbonSubqueryAlias(_, logicalRelation: LogicalRelation))
    +        if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        val carbonTable = getCarbonTable(logicalRelation)
    +        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    +        val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
    +        val isValidPlan = extractQueryColumnsFromAggExpression(
    +          grExp,
    +          aggExp,
    +          carbonTable,
    +          list,
    +          aggregateExpressions)
    +        if(isValidPlan) {
    +          val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
    +            aggregateExpressions,
                 carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
    -              carbonTable = carbonTable,
    -              tableName = tableName)
    +            logicalRelation)
    +          if(null != aggDataMapSchema && null!= childPlan) {
    +            val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
    +            val (updatedGroupExp, updatedAggExp, newChild, None) =
    +              getUpdatedExpressions(grExp,
    +                aggExp,
    +                child,
    +                None,
    +                aggDataMapSchema,
    +                attributes,
    +                childPlan,
    +                carbonTable,
    +                logicalRelation)
    +            Aggregate(updatedGroupExp,
    +              updatedAggExp,
    +              newChild)
    +          } else {
    +            agg
               }
    -          carbonTable
    -        // case for handling aggregation with order by and filter when 
only projection column exits
    -        case Sort(sortOrders,
    -          _,
    -          Aggregate(groupingExp,
    -            aggregateExp,
    -            Filter(filterExp, CarbonSubqueryAlias(_, logicalRelation: 
LogicalRelation))))
    -          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    -             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    -               metaData.hasAggregateDataMapSchema =>
    -          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    -          isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    -            aggregateExp,
    +        } else {
    +          agg
    +        }
    +      // case of handling aggregation query with filter
    +      case agg@Aggregate(grExp,
    +      aggExp,
    +      Filter(expression, child@CarbonSubqueryAlias(_, logicalRelation: 
LogicalRelation)))
    +        if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
    +           
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.
    +             metaData.hasAggregateDataMapSchema =>
    +        val carbonTable = getCarbonTable(logicalRelation)
    +        val list = scala.collection.mutable.HashSet.empty[QueryColumn]
    +        val aggregateExpressions = 
scala.collection.mutable.HashSet.empty[AggregateExpression]
    +        var isValidPlan = extractQueryColumnsFromAggExpression(
    +          grExp,
    +          aggExp,
    +          carbonTable,
    +          list,
    +          aggregateExpressions)
    +        // getting the columns from filter expression
    +        isValidPlan = 
!CarbonReflectionUtils.hasPredicateSubquery(expression)
    +        if (isValidPlan) {
    +          isValidPlan = extractQueryColumnFromFilterExp(expression, list, 
carbonTable)
    +        }
    +        if(isValidPlan) {
    +          val (aggDataMapSchema, childPlan) = 
getChildDataMapForTransformation(list,
    +            aggregateExpressions,
                 carbonTable,
    -            tableName,
    -            list)
    -          if(isValidPlan) {
    -            isValidPlan = 
!CarbonReflectionUtils.hasPredicateSubquery(filterExp)
    -          }
    -          if(isValidPlan) {
    -            list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
    -              carbonTable = carbonTable,
    -              tableName = tableName)
    -            isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, 
carbonTable, tableName)
    -          }
    -          carbonTable
    -        case _ =>
    -          isValidPlan = false
    -          null
    -      }
    -      if (isValidPlan && null != carbonTable) {
    -        isValidPlan = isSpecificSegmentPresent(carbonTable)
    -      }
    -      // if plan is valid then update the plan with child attributes
    -      if (isValidPlan) {
    -        // getting all the projection columns
    -        val listProjectionColumn = list
    -          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && 
!queryColumn.isFilterColumn)
    -          .toList
    -        // getting all the filter columns
    -        val listFilterColumn = list
    -          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && 
queryColumn.isFilterColumn)
    -          .toList
    -        // getting all the aggregation columns
    -        val listAggregationColumn = list.filter(queryColumn => 
!queryColumn.getAggFunction.isEmpty)
    -          .toList
    -        // create a query plan object which will be used to select the 
list of pre aggregate tables
    -        // matches with this plan
    -        val queryPlan = new QueryPlan(listProjectionColumn.asJava,
    -          listAggregationColumn.asJava,
    -          listFilterColumn.asJava)
    -        // create aggregate table selector object
    -        val aggregateTableSelector = new AggregateTableSelector(queryPlan, 
carbonTable)
    -        // select the list of valid child tables
    -        val selectedDataMapSchemas = 
aggregateTableSelector.selectPreAggDataMapSchema()
    -        // if it does not match with any pre aggregate table return the 
same plan
    -        if (!selectedDataMapSchemas.isEmpty) {
    -          // filter the selected child schema based on size to select the 
pre-aggregate tables
    -          // that are nonEmpty
    -          val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
    -          val relationBuffer = selectedDataMapSchemas.asScala.map { 
selectedDataMapSchema =>
    -            val identifier = TableIdentifier(
    -              selectedDataMapSchema.getRelationIdentifier.getTableName,
    -              
Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
    -            val carbonRelation =
    -              
catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
    -            val relation = 
sparkSession.sessionState.catalog.lookupRelation(identifier)
    -            (selectedDataMapSchema, carbonRelation, relation)
    -          }.filter(_._2.sizeInBytes != 0L)
    -          if (relationBuffer.isEmpty) {
    -            // If the size of relation Buffer is 0 then it means that none 
of the pre-aggregate
    -            // tables have date yet.
    -            // In this case we would return the original plan so that the 
query hits the parent
    -            // table.
    -            plan
    +            logicalRelation)
    +          if(null != aggDataMapSchema && null!= childPlan) {
    +            val attributes = 
childPlan.output.asInstanceOf[Seq[AttributeReference]]
    +            val (updatedGroupExp, updatedAggExp, newChild, 
updatedFilterExpression) =
    +              getUpdatedExpressions(grExp,
    +                aggExp,
    +                child,
    +                Some(expression),
    +                aggDataMapSchema,
    +                attributes,
    +                childPlan,
    +                carbonTable,
    +                logicalRelation)
    +            Aggregate(updatedGroupExp,
    +              updatedAggExp,
    +              Filter(updatedFilterExpression.get,
    +                newChild))
               } else {
    -            // If the relationBuffer is nonEmpty then find the table with 
the minimum size.
    -            val (aggDataMapSchema, _, relation) = 
relationBuffer.minBy(_._2.sizeInBytes)
    -            val newRelation = new 
FindDataSourceTable(sparkSession).apply(relation)
    -            // transform the query plan based on selected child schema
    -            transformPreAggQueryPlan(plan, aggDataMapSchema, newRelation)
    +            agg
               }
             } else {
    -          plan
    +          agg
             }
    +    }
    +    updatedPlan
    +  }
    +
    +  /**
    +   * Below method will be used to validate query plan and get the proper 
aggregation data map schema
    +   * and child relation plan object if plan is valid for transformation
    +   * @param queryColumns
    +   * list of query columns from projection and filter
    +   * @param aggregateExpressions
    +   * list of aggregate expression (aggregate function)
    +   * @param carbonTable
    +   * parent carbon table
    +   * @param logicalRelation
    +   * parent logical relation
    +   * @return if plan is valid then aggregation data map schema and its 
relation plan
    +   */
    +  def getChildDataMapForTransformation(queryColumns: 
scala.collection.mutable.HashSet[QueryColumn],
    +      aggregateExpressions: 
scala.collection.mutable.HashSet[AggregateExpression],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation): (AggregationDataMapSchema, 
LogicalPlan) = {
    +    // getting all the projection columns
    +    val listProjectionColumn = queryColumns
    +      .filter(queryColumn => !queryColumn.isFilterColumn)
    +      .toList
    +    // getting all the filter columns
    +    val listFilterColumn = queryColumns
    +      .filter(queryColumn => queryColumn.isFilterColumn)
    +      .toList
    +    val isProjectionColumnPresent = (listProjectionColumn.size + 
listFilterColumn.size) > 0
    +    // create a query plan object which will be used to select the list of 
pre aggregate tables
    +    // matches with this plan
    +    val queryPlan = new QueryPlan(listProjectionColumn.asJava, 
listFilterColumn.asJava)
    +    // create aggregate table selector object
    +    val aggregateTableSelector = new AggregateTableSelector(queryPlan, 
carbonTable)
    +    // select the list of valid child tables
    +    val selectedDataMapSchemas = 
aggregateTableSelector.selectPreAggDataMapSchema()
    +    // query has only aggregate expression then selected data map will be 
empty
    +    // the validate all the child data map otherwise validate selected 
data map
    +    var selectedAggMaps = if (isProjectionColumnPresent) {
    +      selectedDataMapSchemas
    +    } else {
    +      carbonTable.getTableInfo.getDataMapSchemaList
    +    }
    +    val aggExpLogicalPlans = aggregateExpressions.map { queryAggExp =>
    +      PreAggregateUtil.getLogicalPlanFromAggExp(queryAggExp,
    +        carbonTable.getTableName,
    +        carbonTable.getDatabaseName,
    +        logicalRelation,
    +        sparkSession,
    +        parser)
    +    }.toSeq
    +    // if query does not have any aggregate function no need to validate 
the same
    +    if (aggregateExpressions.size > 0 && selectedAggMaps.size > 0) {
    +      selectedAggMaps = 
validateAggregateExpression(selectedAggMaps.asScala.toSeq,
    +        carbonTable,
    +        logicalRelation,
    +        aggExpLogicalPlans).asJava
    +    }
    +    // if it does not match with any pre aggregate table return the same 
plan
    +    if (!selectedAggMaps.isEmpty) {
    +      // filter the selected child schema based on size to select the 
pre-aggregate tables
    +      // that are nonEmpty
    +      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
    +      val relationBuffer = selectedAggMaps.asScala.map { 
selectedDataMapSchema =>
    +        val identifier = TableIdentifier(
    +          selectedDataMapSchema.getRelationIdentifier.getTableName,
    +          
Some(selectedDataMapSchema.getRelationIdentifier.getDatabaseName))
    +        val carbonRelation =
    +          
catalog.lookupRelation(identifier)(sparkSession).asInstanceOf[CarbonRelation]
    +        val relation = 
sparkSession.sessionState.catalog.lookupRelation(identifier)
    +        (selectedDataMapSchema, carbonRelation, relation)
    +      }.filter(_._2.sizeInBytes != 0L)
    +      if (relationBuffer.isEmpty) {
    +        // If the size of relation Buffer is 0 then it means that none of 
the pre-aggregate
    +        // tables have date yet.
    +        // In this case we would return the original plan so that the 
query hits the parent
    +        // table.
    +        (null, null)
           } else {
    -        plan
    +        // If the relationBuffer is nonEmpty then find the table with the 
minimum size.
    +        val (aggDataMapSchema, _, relation) = 
relationBuffer.minBy(_._2.sizeInBytes)
    +        val newRelation = new 
FindDataSourceTable(sparkSession).apply(relation)
    +        (aggDataMapSchema.asInstanceOf[AggregationDataMapSchema], 
newRelation)
           }
    +    } else {
    +      (null, null)
         }
       }
     
    +  /**
    +   * Below method will be used to validate aggregate expression with the 
data map
    +   * and will return the selected valid data maps
    +   * @param selectedDataMap
    +   *                        list of data maps
    +   * @param carbonTable
    +   *                    parent carbon table
    +   * @param logicalRelation
    +   *                        parent logical relation
    +   * @param queryAggExpLogicalPlans
    +   *                                query agg expression logical plan
    +   * @return valid data map
    +   */
    +  def validateAggregateExpression(selectedDataMap: Seq[DataMapSchema],
    +      carbonTable: CarbonTable,
    +      logicalRelation: LogicalRelation,
    +      queryAggExpLogicalPlans: Seq[LogicalPlan]): Seq[DataMapSchema] = {
    +    def validateDataMap(dataMap: DataMapSchema,
    +        aggExpLogicalPlans: Seq[LogicalPlan]): Boolean = {
    +      val mappingModel = getLogicalPlanForAggregateExpression(dataMap,
    +        carbonTable,
    +        logicalRelation)
    +      aggExpLogicalPlans.forall{
    +        p => mappingModel.exists(m => p.sameResult(m.logicalPlan))
    --- End diff --
    
    move p => to up


---

Reply via email to