Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1464#discussion_r149004253
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateQueryRules.scala
 ---
    @@ -0,0 +1,756 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.hive
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.{AnalysisException, 
CarbonDatasourceHadoopRelation, SparkSession}
    +import org.apache.spark.sql.catalyst.TableIdentifier
    +import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
    +import org.apache.spark.sql.catalyst.expressions.{Alias, 
AttributeReference, Cast, Divide,
    +Expression, NamedExpression}
    +import org.apache.spark.sql.catalyst.expressions.aggregate._
    +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LogicalPlan, SubqueryAlias}
    +import org.apache.spark.sql.catalyst.rules.Rule
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.types._
    +
    +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
DataMapSchema}
    +import org.apache.carbondata.core.preagg.{AggregateTableSelector, 
QueryColumn, QueryPlan}
    +import org.apache.carbondata.spark.util.CarbonScalaUtil
    +
    +/**
    + * Class for applying Pre Aggregate rules
    + * Responsibility.
    + * 1. Check plan is valid plan for updating the parent table plan with 
child table
    + * 2. Updated the plan based on child schema
    + *
    + * Rules for Upadating the plan
    + * 1. Grouping expression rules
    + *    1.1 Change the parent attribute reference for of group expression
    + * to child attribute reference
    + *
    + * 2. Aggregate expression rules
    + *    2.1 Change the parent attribute reference for of group expression to
    + * child attribute reference
    + *    2.2 Change the count AggregateExpression to Sum as count
    + * is already calculated so in case of aggregate table
    + * we need to apply sum to get the count
    + *    2.2 In case of average aggregate function select 2 columns from 
aggregate table with
    + * aggregation
    + * sum and count. Then add divide(sum(column with sum), sum(column with 
count)).
    + * Note: During aggregate table creation for average table will be created 
with two columns
    + * one for sum(column) and count(column) to support rollup
    + *
    + * 3. Filter Expression rules.
    + *    3.1 Updated filter expression attributes with child table attributes
    + * 4. Update the Parent Logical relation with child Logical relation
    + *
    + * @param sparkSession
    + * spark session
    + */
    +case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) 
extends Rule[LogicalPlan] {
    +
    +  override def apply(plan: LogicalPlan): LogicalPlan = {
    +    var needAnalysis = true
    +    plan.transformExpressions {
    +      // first check if any preAgg scala function is applied it is present 
is in plan
    +      // then call is from create preaggregate table class so no need to 
transform the query plan
    +      case al@Alias(_, name) if name.equals("preAgg") =>
    +        needAnalysis = false
    +        al
    +      // in case of query if any unresolve alias is present then wait for 
plan to be resolved
    +      // return the same plan as we can tranform the plan only when 
everything is resolved
    +      case unresolveAlias@UnresolvedAlias(_, _) =>
    +        needAnalysis = false
    +        unresolveAlias
    +    }
    +    // if plan is not valid for transformation then return same plan
    +    if (!needAnalysis) {
    +      plan
    +    } else {
    +      // create buffer to collect all the column and its metadata 
information
    +      val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
    +      var isValidPlan = true
    +      val carbonTable = plan match {
    +        // matching the plan based on supported plan
    +        // if plan is matches with any case it will validate and get all
    +        // information required for transforming the plan
    +
    +        // When plan has grouping expression, aggregate expression
    +        // subquery
    +        case Aggregate(groupingExp,
    +        aggregateExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _))
    +          // only carbon query plan is supported checking whether logical 
relation is
    +          // is for carbon
    +          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only 
plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +
    +        // below case for handling filter query
    +        // When plan has grouping expression, aggregate expression
    +        // filter expression
    +        case Aggregate(groupingExp, aggregateExp,
    +        Filter(filterExp,
    +        SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
    +          // only carbon query plan is supported checking whether logical 
relation is
    +          // is for carbon
    +          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only 
plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +            // getting the columns from filter expression
    +            filterExp.transform {
    +              case attr: AttributeReference =>
    +                list += getQueryColumn(attr.name, carbonTable, tableName, 
isFilterColumn = true)
    +                attr
    +            }
    +          }
    +          carbonTable
    +
    +        // When plan has grouping expression, aggregate expression
    +        // logical relation
    +        case Aggregate(groupingExp, aggregateExp, logicalRelation: 
LogicalRelation)
    +          // only carbon query plan is supported checking whether logical 
relation is
    +          // is for carbon
    +          if 
logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]
    +             &&
    +             
logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
    +               .hasPreAggDataMap =>
    +          val (carbonTable, tableName) = 
getCarbonTableAndTableName(logicalRelation)
    +          // when table has child data map(pre aggregate table) then only 
plan will be transformed
    +          if (!carbonTable.hasPreAggDataMap) {
    +            isValidPlan = false
    +          }
    +          if (isValidPlan) {
    +            // if it is valid plan then extract the query columns
    +            isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
    +              aggregateExp,
    +              carbonTable,
    +              tableName,
    +              list)
    +          }
    +          carbonTable
    +        case _ =>
    +          isValidPlan = false
    +          null
    +      }
    +      // if plan is valid then update the plan with child attributes
    +      if (isValidPlan) {
    +        // getting all the projection columns
    +        val listProjectionColumn = list
    +          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && 
!queryColumn.isFilterColumn)
    +        // getting all the filter columns
    +        val listFilterColumn = list
    +          .filter(queryColumn => queryColumn.getAggFunction.isEmpty && 
queryColumn.isFilterColumn)
    +        // getting all the aggregation columns
    +        val listAggregationColumn = list.filter(queryColumn => 
!queryColumn.getAggFunction.isEmpty)
    +        // create a query plan object which will be used to select the 
list of pre aggregate tables
    +        // matches with this plan
    +        val queryPlan = new QueryPlan(listProjectionColumn.asJava,
    +          listAggregationColumn.asJava,
    +          listFilterColumn.asJava)
    +        // create aggregate table selector object
    +        val aggregateTableSelector = new AggregateTableSelector(queryPlan, 
carbonTable)
    +        // select the list of valid child tables
    +        val selectedDataMapSchemas = 
aggregateTableSelector.selectPreAggDataMapSchema()
    +        // if it doesnot match with any pre aggregate table return the 
same plan
    +        if (!selectedDataMapSchemas.isEmpty) {
    +          // sort the selected child schema based on size to select 
smallest pre aggregate table
    +          val (aggDataMapSchema, carbonRelation) =
    +            selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
    +              val catalog = sparkSession.sessionState.catalog
    +              val carbonRelation = catalog
    +                
.lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier
    +                  .getTableName,
    +                  Some(selectedDataMapSchema.getRelationIdentifier
    +                    .getDatabaseName))).asInstanceOf[SubqueryAlias].child
    +                .asInstanceOf[LogicalRelation]
    +              (selectedDataMapSchema, carbonRelation)
    +            }.sortBy(f => 
f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)
    +              .head
    --- End diff --
    
    Instead of using `}.sortBy(f => 
f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes).head`
    use `.minBy(f => 
f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)`


---

Reply via email to