http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
index 56deb8a..829c487 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
@@ -15,38 +15,27 @@
  * limitations under the License.
  */
 
-
 package org.apache.spark.sql.hive
 
-import scala.math.BigInt.int2bigInt
+import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
-import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
PhysicalOperation, QueryPlanner}
-import org.apache.spark.sql.catalyst.plans.Inner
-import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, Limit, 
LogicalPlan, Sort}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, QueryPlanner}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter => LogicalFilter, 
LogicalPlan}
 import org.apache.spark.sql.execution.{DescribeCommand => 
RunnableDescribeCommand, ExecutedCommand, Filter, Project, SparkPlan}
-import org.apache.spark.sql.execution.aggregate.Utils
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.{DescribeCommand => 
LogicalDescribeCommand, LogicalRelation}
-import org.apache.spark.sql.execution.joins.{BroadCastFilterPushJoin, 
BuildLeft, BuildRight}
 import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, 
DropTable, HiveNativeCommand}
+import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, 
CarbonDecoderRelation}
 
 import org.carbondata.common.logging.LogServiceFactory
+import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.carbondata.spark.exception.MalformedCarbonCommandException
 
-object CarbonHiveSyntax {
-
-  @transient
-  protected val sqlParser = new CarbonSqlParser
-
-  def parse(sqlText: String): LogicalPlan = {
-    sqlParser.parse(sqlText)
-  }
-}
 
 class CarbonStrategies(sqlContext: SQLContext) extends QueryPlanner[SparkPlan] 
{
 
@@ -55,342 +44,179 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
   val LOGGER = LogServiceFactory.getLogService("CarbonStrategies")
 
   def getStrategies: Seq[Strategy] = {
-    val total = sqlContext.planner.strategies :+ CarbonTableScans :+ 
DDLStrategies
+    val total = sqlContext.planner.strategies :+ CarbonTableScan
     total
   }
 
   /**
-   * Carbon strategies for Carbon cube scanning
+   * Carbon strategies for performing late materizlization (decoding 
dictionary key
+   * as late as possbile)
    */
-  private[sql] object CarbonTableScans extends Strategy {
-
-    def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case PhysicalOperation(projectList, predicates,
-      l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) =>
-        carbonScan(projectList,
-          predicates,
-          carbonRelation.carbonRelation,
-          None,
-          None,
-          None,
-          isGroupByPresent = false,
-          detailQuery = true) :: Nil
-
-      case Limit(IntegerLiteral(limit),
-      Sort(order, _,
-      p@CarbonAggregation(groupingExpressions,
-      aggregateExpressions,
-      PhysicalOperation(
-      projectList,
-      predicates,
-      l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))))) =>
-        val aggPlan = handleAggregation(plan, p, projectList, predicates, 
carbonRelation,
-          groupingExpressions, aggregateExpressions)
-        org.apache.spark.sql.execution.TakeOrderedAndProject(limit,
-          order,
-          None,
-          aggPlan.head) :: Nil
-
-      case Limit(IntegerLiteral(limit), p@CarbonAggregation(
-      groupingExpressions,
-      aggregateExpressions,
-      PhysicalOperation(projectList, predicates,
-      l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)))) =>
-        val aggPlan = handleAggregation(plan, p, projectList, predicates, 
carbonRelation,
-          groupingExpressions, aggregateExpressions)
-        org.apache.spark.sql.execution.Limit(limit, aggPlan.head) :: Nil
-
-      case CarbonAggregation(
-      groupingExpressions,
-      aggregateExpressions,
-      PhysicalOperation(projectList, predicates,
-      l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) =>
-        handleAggregation(plan, plan, projectList, predicates, carbonRelation,
-          groupingExpressions, aggregateExpressions)
-
-      case Limit(IntegerLiteral(limit),
-      PhysicalOperation(projectList, predicates,
-      l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) =>
-        val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = 
extractPlan(plan)
-        val s = carbonScan(projectList, predicates, 
carbonRelation.carbonRelation, groupExprs,
-          substitutesortExprs, limitExpr, isGroupByPresent = false, 
detailQuery = true)
-        org.apache.spark.sql.execution.Limit(limit, s) :: Nil
-
-      case Limit(IntegerLiteral(limit),
-      Sort(order, _,
-      PhysicalOperation(projectList, predicates,
-      l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)))) =>
-        val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = 
extractPlan(plan)
-        val s = carbonScan(projectList, predicates, 
carbonRelation.carbonRelation, groupExprs,
-          substitutesortExprs, limitExpr, isGroupByPresent = false, 
detailQuery = true)
-        org.apache.spark.sql.execution.TakeOrderedAndProject(limit,
-          order,
-          None,
-          s) :: Nil
-
-      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition,
-      PhysicalOperation(projectList, predicates,
-      l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)), right)
-        if canPushDownJoin(right, condition) =>
-        LOGGER.info(s"pushing down for ExtractEquiJoinKeys:right")
-        val carbon = carbonScan(projectList,
-          predicates,
-          carbonRelation.carbonRelation,
-          None,
-          None,
-          None,
-          isGroupByPresent = false,
-          detailQuery = true)
-        val pushedDownJoin = BroadCastFilterPushJoin(
-          leftKeys: Seq[Expression],
-          rightKeys: Seq[Expression],
-          BuildRight,
-          carbon,
-          planLater(right),
-          condition)
-
-        condition.map(Filter(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: 
Nil
-
-      case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left,
-      PhysicalOperation(projectList, predicates,
-      l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)))
-        if canPushDownJoin(left, condition) =>
-        LOGGER.info(s"pushing down for ExtractEquiJoinKeys:left")
-        val carbon = carbonScan(projectList,
-          predicates,
-          carbonRelation.carbonRelation,
-          None,
-          None,
-          None,
-          isGroupByPresent = false,
-          detailQuery = true)
-
-        val pushedDownJoin = BroadCastFilterPushJoin(
-          leftKeys: Seq[Expression],
-          rightKeys: Seq[Expression],
-          BuildLeft,
-          planLater(left),
-          carbon,
-          condition)
-        condition.map(Filter(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: 
Nil
-
-      case _ => Nil
-    }
-
-    def handleAggregation(plan: LogicalPlan,
-        aggPlan: LogicalPlan,
-        projectList: Seq[NamedExpression],
-        predicates: Seq[Expression],
-        carbonRelation: CarbonDatasourceRelation,
-        groupingExpressions: Seq[Expression],
-        namedGroupingAttributes: Seq[NamedExpression]):
-    Seq[SparkPlan] = {
-      val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = 
extractPlan(plan)
-
-      val s =
-        try {
-          carbonScan(projectList, predicates, carbonRelation.carbonRelation,
-            Some(namedGroupingAttributes), substitutesortExprs,
-            limitExpr, groupingExpressions.nonEmpty)
-        } catch {
-          case e: Exception => null
-        }
-
-      if (s != null) {
-        aggregatePlan(groupingExpressions, namedGroupingAttributes, s)
-
-      } else {
-        (aggPlan, true) match {
-          case CarbonAggregation(
-          groupingExpressions,
-          namedGroupingAttributes,
-          PhysicalOperation(projectList, predicates,
-          l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _))) =>
-            val (_, _, _, _, groupExprs, substitutesortExprs, limitExpr) = 
extractPlan(plan)
-
-
-            val s = carbonScan(projectList, predicates, 
carbonRelation.carbonRelation,
-              Some(namedGroupingAttributes), substitutesortExprs, limitExpr,
-              groupingExpressions.nonEmpty, detailQuery = true)
-
-            aggregatePlan(groupingExpressions, namedGroupingAttributes, s)
-        }
+  private[sql] object CarbonTableScan extends Strategy {
+
+    def apply(plan: LogicalPlan): Seq[SparkPlan] = {
+      plan match {
+        case PhysicalOperation(projectList, predicates,
+        l@LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) =>
+          if (isStarQuery(plan)) {
+            carbonRawScanForStarQuery(projectList, predicates, carbonRelation, 
l)(sqlContext) :: Nil
+          } else {
+            carbonRawScan(projectList,
+              predicates,
+              carbonRelation,
+              l,
+              None,
+              detailQuery = true,
+              useBinaryAggregation = false)(sqlContext)._1 :: Nil
+          }
+        case CarbonDictionaryCatalystDecoder(relations, profile, aliasMap, _, 
child) =>
+          CarbonDictionaryDecoder(relations,
+            profile,
+            aliasMap,
+            planLater(child))(sqlContext) :: Nil
+        case _ =>
+          Nil
       }
     }
 
-    // TODO: It is duplicated code from spark. Need to find a way
-    private def aggregatePlan(groupingExpressions: Seq[Expression],
-        resultExpressions: Seq[NamedExpression],
-        child: SparkPlan) = {
-      // A single aggregate expression might appear multiple times in 
resultExpressions.
-      // In order to avoid evaluating an individual aggregate function 
multiple times, we'll
-      // build a set of the distinct aggregate expressions and build a 
function which can
-      // be used to re-write expressions so that they reference the single 
copy of the
-      // aggregate function which actually gets computed.
-      val aggregateExpressions = resultExpressions.flatMap { expr =>
-        expr.collect {
-          case agg: AggregateExpression => agg
-        }
-      }.distinct
-      // For those distinct aggregate expressions, we create a map from the
-      // aggregate function to the corresponding attribute of the function.
-      val aggregateFunctionToAttribute = aggregateExpressions.map { agg =>
-        val aggregateFunction = agg.aggregateFunction
-        val attribute = Alias(aggregateFunction, 
aggregateFunction.toString)().toAttribute
-        (aggregateFunction, agg.isDistinct) -> attribute
-      }.toMap
-
-      val (functionsWithDistinct, functionsWithoutDistinct) =
-        aggregateExpressions.partition(_.isDistinct)
-      if 
(functionsWithDistinct.map(_.aggregateFunction.children).distinct.length > 1) {
-        // This is a sanity check. We should not reach here when we have 
multiple distinct
-        // column sets. Our MultipleDistinctRewriter should take care this 
case.
-        sys.error("You hit a query analyzer bug. Please report your query to " 
+
-                  "Spark user mailing list.")
-      }
-
-      val namedGroupingExpressions = groupingExpressions.map {
-        case ne: NamedExpression => ne -> ne
-        // If the expression is not a NamedExpressions, we add an alias.
-        // So, when we generate the result of the operator, the Aggregate 
Operator
-        // can directly get the Seq of attributes representing the grouping 
expressions.
-        case other =>
-          val withAlias = Alias(other, other.toString)()
-          other -> withAlias
-      }
-      val groupExpressionMap = namedGroupingExpressions.toMap
-
-      // The original `resultExpressions` are a set of expressions which may 
reference
-      // aggregate expressions, grouping column values, and constants. When 
aggregate operator
-      // emits output rows, we will use `resultExpressions` to generate an 
output projection
-      // which takes the grouping columns and final aggregate result buffer as 
input.
-      // Thus, we must re-write the result expressions so that their 
attributes match up with
-      // the attributes of the final result projection's input row:
-      val rewrittenResultExpressions = resultExpressions.map { expr =>
-        expr.transformDown {
-          case AggregateExpression(aggregateFunction, _, isDistinct) =>
-            // The final aggregation buffer's attributes will be 
`finalAggregationAttributes`,
-            // so replace each aggregate expression by its corresponding 
attribute in the set:
-            aggregateFunctionToAttribute(aggregateFunction, isDistinct)
-          case expression =>
-            // Since we're using `namedGroupingAttributes` to extract the 
grouping key
-            // columns, we need to replace grouping key expressions with their 
corresponding
-            // attributes. We do not rely on the equality check at here since 
attributes may
-            // differ cosmetically. Instead, we use semanticEquals.
-            groupExpressionMap.collectFirst {
-              case (expr, ne) if expr semanticEquals expression => 
ne.toAttribute
-            }.getOrElse(expression)
-        }.asInstanceOf[NamedExpression]
+    /**
+     * Create carbon scan
+     */
+    private def carbonRawScan(projectList: Seq[NamedExpression],
+      predicates: Seq[Expression],
+      relation: CarbonDatasourceRelation,
+      logicalRelation: LogicalRelation,
+      groupExprs: Option[Seq[Expression]],
+      detailQuery: Boolean,
+      useBinaryAggregation: Boolean)(sc: SQLContext): (SparkPlan, Boolean) = {
+
+      val tableName: String =
+        
relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
+      // Check out any expressions are there in project list. if they are 
present then we need to
+      // decode them as well.
+      val projectSet = AttributeSet(projectList.flatMap(_.references))
+      val scan = CarbonScan(projectSet.toSeq,
+        relation.carbonRelation,
+        predicates,
+        groupExprs,
+        useBinaryAggregation)(sqlContext)
+      val dimAggrsPresence: Boolean = 
scan.buildCarbonPlan.getDimAggregatorInfos.size() > 0
+      projectList.map {
+        case attr: AttributeReference =>
+        case Alias(attr: AttributeReference, _) =>
+        case others =>
+          others.references
+              .map(f => 
scan.attributesNeedToDecode.add(f.asInstanceOf[AttributeReference]))
       }
-
-      val aggregateOperator =
-        if 
(aggregateExpressions.map(_.aggregateFunction).exists(!_.supportsPartial)) {
-          if (functionsWithDistinct.nonEmpty) {
-            sys.error("Distinct columns cannot exist in Aggregate operator 
containing " +
-                      "aggregate functions which don't support partial 
aggregation.")
+      if (!detailQuery) {
+        if (scan.attributesNeedToDecode.size > 0) {
+          val decoder = getCarbonDecoder(logicalRelation,
+            sc,
+            tableName,
+            scan.attributesNeedToDecode.asScala.toSeq,
+            scan)
+          if (scan.unprocessedExprs.nonEmpty) {
+            val filterCondToAdd = 
scan.unprocessedExprs.reduceLeftOption(expressions.And)
+            (Project(projectList, filterCondToAdd.map(Filter(_, 
decoder)).getOrElse(decoder)), true)
           } else {
-            Utils.planAggregateWithoutPartial(
-              namedGroupingExpressions.map(_._2),
-              aggregateExpressions,
-              aggregateFunctionToAttribute,
-              rewrittenResultExpressions,
-              child)
+            (Project(projectList, decoder), true)
           }
-        } else if (functionsWithDistinct.isEmpty) {
-          Utils.planAggregateWithoutDistinct(
-            namedGroupingExpressions.map(_._2),
-            aggregateExpressions,
-            aggregateFunctionToAttribute,
-            rewrittenResultExpressions,
-            child)
         } else {
-          Utils.planAggregateWithOneDistinct(
-            namedGroupingExpressions.map(_._2),
-            functionsWithDistinct,
-            functionsWithoutDistinct,
-            aggregateFunctionToAttribute,
-            rewrittenResultExpressions,
-            child)
+          (scan, dimAggrsPresence)
+        }
+      } else {
+        if (scan.attributesNeedToDecode.size() > 0) {
+          val decoder = getCarbonDecoder(logicalRelation,
+            sc,
+            tableName,
+            scan.attributesNeedToDecode.asScala.toSeq,
+            scan)
+          if (scan.unprocessedExprs.nonEmpty) {
+            val filterCondToAdd = 
scan.unprocessedExprs.reduceLeftOption(expressions.And)
+            (Project(projectList, filterCondToAdd.map(Filter(_, 
decoder)).getOrElse(decoder)), true)
+          } else {
+            (Project(projectList, decoder), true)
+          }
+        } else {
+          (Project(projectList, scan), dimAggrsPresence)
         }
-
-      aggregateOperator
-    }
-
-    private def canPushDownJoin(otherRDDPlan: LogicalPlan,
-        joinCondition: Option[Expression]): Boolean = {
-      val pushdowmJoinEnabled = sqlContext.sparkContext.conf
-        .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = 
true)
-
-      if (!pushdowmJoinEnabled) {
-        return false
-      }
-
-      otherRDDPlan match {
-        case BroadcastHint(p) => true
-        case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
-                  p.statistics.sizeInBytes <= 
sqlContext.conf.autoBroadcastJoinThreshold =>
-          LOGGER.info("canPushDownJoin statistics:" + p.statistics.sizeInBytes)
-          true
-        case _ => false
       }
     }
 
     /**
-     * Create carbon scan
+     * Create carbon scan for star query
      */
-    private def carbonScan(projectList: Seq[NamedExpression],
-        predicates: Seq[Expression],
-        relation: CarbonRelation,
-        groupExprs: Option[Seq[Expression]],
-        substitutesortExprs: Option[Seq[SortOrder]],
-        limitExpr: Option[Expression],
-        isGroupByPresent: Boolean,
-        detailQuery: Boolean = false) = {
+    private def carbonRawScanForStarQuery(projectList: Seq[NamedExpression],
+      predicates: Seq[Expression],
+      relation: CarbonDatasourceRelation,
+      logicalRelation: LogicalRelation)(sc: SQLContext): SparkPlan = {
+
+      val tableName: String =
+        
relation.carbonRelation.metaData.carbonTable.getFactTableName.toLowerCase
+      // Check out any expressions are there in project list. if they are 
present then we need to
+      // decode them as well.
+      val projectExprsNeedToDecode = new java.util.HashSet[Attribute]()
+      val scan = CarbonScan(projectList.map(_.toAttribute),
+        relation.carbonRelation,
+        predicates,
+        None,
+        useBinaryAggregator = false)(sqlContext)
+      projectExprsNeedToDecode.addAll(scan.attributesNeedToDecode)
+      if (projectExprsNeedToDecode.size() > 0) {
+        val decoder = getCarbonDecoder(logicalRelation,
+          sc,
+          tableName,
+          projectExprsNeedToDecode.asScala.toSeq,
+          scan)
+        if (scan.unprocessedExprs.nonEmpty) {
+          val filterCondToAdd = 
scan.unprocessedExprs.reduceLeftOption(expressions.And)
+          filterCondToAdd.map(Filter(_, decoder)).getOrElse(decoder)
+        } else {
+          decoder
+        }
+      } else {
+        scan
+      }
+    }
 
-      if (!detailQuery) {
-        val projectSet = AttributeSet(projectList.flatMap(_.references))
-        CarbonTableScan(
-          projectSet.toSeq,
-          relation,
-          predicates,
-          groupExprs,
-          substitutesortExprs,
-          limitExpr,
-          isGroupByPresent,
-          detailQuery)(sqlContext)
+    def getCarbonDecoder(logicalRelation: LogicalRelation,
+      sc: SQLContext,
+      tableName: String,
+      projectExprsNeedToDecode: Seq[Attribute],
+      scan: CarbonScan): CarbonDictionaryDecoder = {
+      val relation = CarbonDecoderRelation(logicalRelation.attributeMap,
+        logicalRelation.relation.asInstanceOf[CarbonDatasourceRelation])
+      val attrs = projectExprsNeedToDecode.map { attr =>
+        val newAttr = AttributeReference(attr.name,
+          attr.dataType,
+          attr.nullable,
+          attr.metadata)(attr.exprId, Seq(tableName))
+        relation.addAttribute(newAttr)
+        newAttr
       }
-      else {
-        val projectSet = AttributeSet(projectList.flatMap(_.references))
-        Project(projectList,
-          CarbonTableScan(projectSet.toSeq,
-            relation,
-            predicates,
-            groupExprs,
-            substitutesortExprs,
-            limitExpr,
-            isGroupByPresent,
-            detailQuery)(sqlContext))
+      CarbonDictionaryDecoder(Seq(relation), IncludeProfile(attrs),
+        CarbonAliasDecoderRelation(), scan)(sc)
+    }
 
+    private def isStarQuery(plan: LogicalPlan) = {
+      plan match {
+        case LogicalFilter(condition,
+        LogicalRelation(carbonRelation: CarbonDatasourceRelation, _)) => true
+        case LogicalRelation(carbonRelation: CarbonDatasourceRelation, _) => 
true
+        case _ => false
       }
     }
 
-    private def extractPlan(plan: LogicalPlan) = {
-      val (a, b, c, aliases, groupExprs, sortExprs, limitExpr) =
-        PhysicalOperation1.collectProjectsAndFilters(plan)
-      val substitutesortExprs = sortExprs match {
-        case Some(sort) =>
-          Some(sort.map {
-            case SortOrder(a: Alias, direction) =>
-              val ref = aliases.getOrElse(a.toAttribute, a) match {
-                case Alias(reference, name) => reference
-                case others => others
-              }
-              SortOrder(ref, direction)
-            case others => others
-          })
-        case others => others
+    private def isGroupByPresentOnMeasures(groupingExpressions: 
Seq[Expression],
+      carbonTable: CarbonTable): Boolean = {
+      groupingExpressions.map { g =>
+        g.collect {
+          case attr: AttributeReference
+            if carbonTable.getMeasureByName(carbonTable.getFactTableName, 
attr.name) != null =>
+            return true
+        }
       }
-      (a, b, c, aliases, groupExprs, substitutesortExprs, limitExpr)
+      false
     }
   }
 
@@ -406,7 +232,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
         ExecutedCommand(ShowAllTablesDetail(schemaName, plan.output)) :: Nil
       case DropTable(tableName, ifNotExists)
         if CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .tableExists(TableIdentifier(tableName.toLowerCase))(sqlContext) =>
+            .tableExists(TableIdentifier(tableName.toLowerCase))(sqlContext) =>
         ExecutedCommand(DropCubeCommand(ifNotExists, None, 
tableName.toLowerCase)) :: Nil
       case ShowAggregateTablesCommand(schemaName) =>
         ExecutedCommand(ShowAggregateTables(schemaName, plan.output)) :: Nil
@@ -415,7 +241,7 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
       case LoadCube(schemaNameOp, cubeName, factPathFromUser, dimFilesPath,
       partionValues, isOverwriteExist, inputSqlString) =>
         val isCarbonTable = CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .tableExists(TableIdentifier(cubeName, schemaNameOp))(sqlContext)
+            .tableExists(TableIdentifier(cubeName, schemaNameOp))(sqlContext)
         if (isCarbonTable || partionValues.nonEmpty) {
           ExecutedCommand(LoadCube(schemaNameOp, cubeName, factPathFromUser,
             dimFilesPath, partionValues, isOverwriteExist, inputSqlString)) :: 
Nil
@@ -433,15 +259,14 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
         }
       case DescribeFormattedCommand(sql, tblIdentifier) =>
         val isCube = CarbonEnv.getInstance(sqlContext).carbonCatalog
-          .tableExists(tblIdentifier)(sqlContext)
+            .tableExists(tblIdentifier)(sqlContext)
         if (isCube) {
           val describe =
             LogicalDescribeCommand(UnresolvedRelation(tblIdentifier, None), 
isExtended = false)
           val resolvedTable = sqlContext.executePlan(describe.table).analyzed
           val resultPlan = sqlContext.executePlan(resolvedTable).executedPlan
           ExecutedCommand(DescribeCommandFormatted(resultPlan, plan.output, 
tblIdentifier)) :: Nil
-        }
-        else {
+        } else {
           ExecutedCommand(DescribeNativeCommand(sql, plan.output)) :: Nil
         }
       case describe@LogicalDescribeCommand(table, isExtended) =>
@@ -461,3 +286,13 @@ class CarbonStrategies(sqlContext: SQLContext) extends 
QueryPlanner[SparkPlan] {
   }
 
 }
+
+object CarbonHiveSyntax {
+
+  @transient
+  protected val sqlParser = new CarbonSqlParser
+
+  def parse(sqlText: String): LogicalPlan = {
+    sqlParser.parse(sqlText)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
deleted file mode 100644
index 58c02ff..0000000
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategy.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-
-import org.carbondata.spark.exception.MalformedCarbonCommandException
-
-private[sql] object CarbonStrategy {
-  def getStrategy(context: SQLContext): Seq[Strategy] = {
-    val carbonStrategy = new CarbonStrategies(context)
-    if (context.conf.asInstanceOf[CarbonSQLConf].pushComputation) {
-      Seq(carbonStrategy.CarbonTableScans, carbonStrategy.DDLStrategies)
-    } else {
-      // TODO: need to remove duplicate code in strategies.
-      Seq(new CarbonRawStrategies(context).CarbonRawTableScans, 
carbonStrategy.DDLStrategies)
-    }
-  }
-}
-
-private[spark] class CarbonSQLDialect(context: HiveContext) extends 
HiveQLDialect(context) {
-
-  @transient
-  protected val sqlParser = new CarbonSqlParser
-
-  override def parse(sqlText: String): LogicalPlan = {
-
-    try {
-      sqlParser.parse(sqlText)
-    } catch {
-      // MalformedCarbonCommandException need to throw directly
-      // because hive can no parse carbon command
-      case ce: MalformedCarbonCommandException =>
-        throw ce
-      case _ => super.parse(sqlText)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
deleted file mode 100644
index 60c6a78..0000000
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonQueryRDD.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.carbondata.spark.rdd
-
-import java.util
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-
-import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.cache.dictionary.Dictionary
-import org.carbondata.core.carbon.datastore.block.TableBlockInfo
-import org.carbondata.core.iterator.CarbonIterator
-import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
-import org.carbondata.query.carbon.executor.QueryExecutorFactory
-import org.carbondata.query.carbon.model.QueryModel
-import org.carbondata.query.carbon.result.RowResult
-import org.carbondata.query.expression.Expression
-import org.carbondata.query.filter.resolver.FilterResolverIntf
-import org.carbondata.spark.KeyVal
-import org.carbondata.spark.load.CarbonLoaderUtil
-import org.carbondata.spark.util.QueryPlanUtil
-
-class CarbonSparkPartition(rddId: Int, val idx: Int,
-  val locations: Array[String],
-  val tableBlockInfos: util.List[TableBlockInfo])
-  extends Partition {
-
-  override val index: Int = idx
-
-  // val serializableHadoopSplit = new 
SerializableWritable[Array[String]](locations)
-  override def hashCode(): Int = {
-    41 * (41 + rddId) + idx
-  }
-}
-
-
- /**
-  * This RDD is used to perform query.
-  */
-class CarbonQueryRDD[K, V](
-  sc: SparkContext,
-  queryModel: QueryModel,
-  filterExpression: Expression,
-  keyClass: KeyVal[K, V],
-  @transient conf: Configuration,
-  cubeCreationTime: Long,
-  schemaLastUpdatedTime: Long,
-  baseStoreLocation: String)
-  extends RDD[(K, V)](sc, Nil) with Logging {
-
-  val defaultParallelism = sc.defaultParallelism
-
-  override def getPartitions: Array[Partition] = {
-    val startTime = System.currentTimeMillis()
-    val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) =
-      
QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
-
-    val result = new util.ArrayList[Partition](defaultParallelism)
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    // set filter resolver tree
-    try {
-      var filterResolver = carbonInputFormat
-        .getResolvedFilter(job.getConfiguration, filterExpression)
-
-      CarbonInputFormat.setFilterPredicates(job.getConfiguration, 
filterResolver)
-      queryModel.setFilterExpressionResolverTree(filterResolver)
-    }
-    catch {
-      case e: Exception =>
-        LOGGER.error(e)
-        sys.error("Exception occurred in query execution :: " + e.getMessage)
-    }
-    // get splits
-    val splits = carbonInputFormat.getSplits(job)
-    if (!splits.isEmpty) {
-      val carbonInputSplits = 
splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-
-      val blockList = carbonInputSplits.map(inputSplit =>
-        new TableBlockInfo(inputSplit.getPath.toString,
-          inputSplit.getStart, inputSplit.getSegmentId,
-          inputSplit.getLocations, inputSplit.getLength
-        )
-      )
-      if (blockList.nonEmpty) {
-        // group blocks to nodes, tasks
-        val nodeBlockMapping =
-          CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, 
defaultParallelism)
-
-        var i = 0
-        // Create Spark Partition for each task and assign blocks
-        nodeBlockMapping.asScala.foreach { entry =>
-          entry._2.asScala.foreach { blocksPerTask =>
-            if (blocksPerTask.size() != 0) {
-              result.add(new CarbonSparkPartition(id, i, 
Seq(entry._1).toArray, blocksPerTask))
-              i += 1
-            }
-          }
-        }
-        val noOfBlocks = blockList.size
-        val noOfNodes = nodeBlockMapping.size
-        val noOfTasks = result.size()
-        logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
-          + s"parallelism: $defaultParallelism , " +
-          s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
-        )
-        logInfo("Time taken to identify Blocks to scan : " +
-          (System.currentTimeMillis() - startTime)
-        )
-        result.asScala.foreach { r =>
-          val cp = r.asInstanceOf[CarbonSparkPartition]
-          logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
-            + ", No.Of Blocks : " + cp.tableBlockInfos.size()
-          )
-        }
-      } else {
-        logInfo("No blocks identified to scan")
-        val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
-        result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, 
nodesPerBlock))
-      }
-    }
-    else {
-      logInfo("No valid segments found to scan")
-      val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
-      result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, 
nodesPerBlock))
-    }
-    result.toArray(new Array[Partition](result.size()))
-  }
-
-
-  override def compute(thepartition: Partition, context: TaskContext): 
Iterator[(K, V)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val iter = new Iterator[(K, V)] {
-      var rowIterator: CarbonIterator[_] = _
-      var queryStartTime: Long = 0
-      try {
-        val carbonSparkPartition = 
thepartition.asInstanceOf[CarbonSparkPartition]
-        if (!carbonSparkPartition.tableBlockInfos.isEmpty) {
-          queryModel.setQueryId(queryModel.getQueryId + "_" + 
carbonSparkPartition.idx)
-          // fill table block info
-          queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
-          queryStartTime = System.currentTimeMillis
-
-          val carbonPropertiesFilePath = 
System.getProperty("carbon.properties.filepath", null)
-          logInfo("*************************" + carbonPropertiesFilePath)
-          if (null == carbonPropertiesFilePath) {
-            System.setProperty("carbon.properties.filepath",
-              System.getProperty("user.dir") + '/' + "conf" + '/' + 
"carbon.properties"
-            )
-          }
-          // execute query
-          rowIterator = 
QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel)
-            .asInstanceOf[CarbonIterator[RowResult]]
-        }
-        // TODOi
-        // : CarbonQueryUtil.isQuickFilter quick filter from dictionary needs 
to support
-      } catch {
-        case e: Throwable =>
-          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-          LOGGER.error(e)
-          // updateCubeAndLevelCacheStatus(levelCacheKeys)
-          if (null != e.getMessage) {
-            sys.error("Exception occurred in query execution :: " + 
e.getMessage)
-          } else {
-            sys.error("Exception occurred in query execution.Please check 
logs.")
-          }
-      }
-
-      var havePair = false
-      var finished = false
-      var recordCount = 0
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = (null == rowIterator) || (!rowIterator.hasNext)
-          havePair = !finished
-        }
-        if (finished) {
-          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-        }
-        !finished
-      }
-
-      override def next(): (K, V) = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        val row = rowIterator.next()
-        val key = row.asInstanceOf[RowResult].getKey()
-        val value = row.asInstanceOf[RowResult].getValue()
-        recordCount += 1
-        if (queryModel.getLimit != -1 && recordCount >= queryModel.getLimit) {
-          clearDictionaryCache(queryModel.getColumnToDictionaryMapping)
-        }
-        keyClass.getKey(key, value)
-      }
-
-      def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, 
Dictionary]) = {
-        if (null != columnToDictionaryMap) {
-          org.carbondata.spark.util.CarbonQueryUtil
-            .clearColumnDictionaryCache(columnToDictionaryMap)
-        }
-      }
-
-      logInfo("*************************** Total Time Taken to execute the 
query in Carbon Side: " +
-        (System.currentTimeMillis - queryStartTime)
-      )
-    }
-    iter
-  }
-
-
-   /**
-    * Get the preferred locations where to launch this task.
-    */
-  override def getPreferredLocations(partition: Partition): Seq[String] = {
-    val theSplit = partition.asInstanceOf[CarbonSparkPartition]
-    theSplit.locations.filter(_ != "localhost")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
deleted file mode 100644
index 5993677..0000000
--- 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonRawQueryRDD.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.carbondata.spark.rdd
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
-
-import org.carbondata.common.logging.LogServiceFactory
-import org.carbondata.core.iterator.CarbonIterator
-import org.carbondata.query.carbon.executor.QueryExecutorFactory
-import org.carbondata.query.carbon.model.QueryModel
-import org.carbondata.query.carbon.result.BatchRawResult
-import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor
-import org.carbondata.query.expression.Expression
-import org.carbondata.spark.{RawKey, RawKeyVal}
-
-
-/**
- * This RDD is used to perform query with raw data, it means it doesn't 
convert dictionary values
- * to actual data.
- *
- * @param sc
- * @param queryModel
- * @param filterExpression
- * @param keyClass
- * @param conf
- * @param cubeCreationTime
- * @param schemaLastUpdatedTime
- * @param baseStoreLocation
- * @tparam K
- * @tparam V
- */
-class CarbonRawQueryRDD[K, V](
-    sc: SparkContext,
-    queryModel: QueryModel,
-    filterExpression: Expression,
-    keyClass: RawKey[K, V],
-    @transient conf: Configuration,
-    cubeCreationTime: Long,
-    schemaLastUpdatedTime: Long,
-    baseStoreLocation: String)
-  extends CarbonQueryRDD[K, V](sc,
-    queryModel,
-    filterExpression,
-    null,
-    conf,
-    cubeCreationTime,
-    schemaLastUpdatedTime,
-    baseStoreLocation) with Logging {
-
-
-  override def compute(thepartition: Partition, context: TaskContext): 
Iterator[(K, V)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
-    val iter = new Iterator[(K, V)] {
-      var rowIterator: CarbonIterator[Array[Any]] = _
-      var queryStartTime: Long = 0
-      try {
-        val carbonSparkPartition = 
thepartition.asInstanceOf[CarbonSparkPartition]
-        if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
-          queryModel.setQueryId(queryModel.getQueryId() + "_" + 
carbonSparkPartition.idx)
-          // fill table block info
-          queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
-          queryStartTime = System.currentTimeMillis
-
-          val carbonPropertiesFilePath = 
System.getProperty("carbon.properties.filepath", null)
-          logInfo("*************************" + carbonPropertiesFilePath)
-          if (null == carbonPropertiesFilePath) {
-            System.setProperty("carbon.properties.filepath",
-              System.getProperty("user.dir") + '/' + "conf" + '/' + 
"carbon.properties");
-          }
-          // execute query
-          rowIterator = new ChunkRawRowIterartor(
-            
QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel)
-            
.asInstanceOf[CarbonIterator[BatchRawResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
-        }
-      } catch {
-        case e: Exception =>
-          LOGGER.error(e)
-          // updateCubeAndLevelCacheStatus(levelCacheKeys)
-          if (null != e.getMessage) {
-            sys.error("Exception occurred in query execution :: " + 
e.getMessage)
-          } else {
-            sys.error("Exception occurred in query execution.Please check 
logs.")
-          }
-      }
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = (null == rowIterator) || (!rowIterator.hasNext())
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): (K, V) = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        val row = rowIterator.next()
-        keyClass.getKey(row, null)
-      }
-
-      logInfo("*************************** Total Time Taken to execute the 
query in Carbon Side: " +
-              (System.currentTimeMillis - queryStartTime)
-      )
-    }
-    iter
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
new file mode 100644
index 0000000..a95ae27
--- /dev/null
+++ 
b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.carbondata.spark.rdd
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.carbondata.common.logging.LogServiceFactory
+import org.carbondata.core.carbon.datastore.block.TableBlockInfo
+import org.carbondata.core.iterator.CarbonIterator
+import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
+import org.carbondata.query.carbon.executor.QueryExecutorFactory
+import org.carbondata.query.carbon.model.QueryModel
+import org.carbondata.query.carbon.result.{BatchRawResult, RowResult}
+import org.carbondata.query.carbon.result.iterator.ChunkRawRowIterartor
+import org.carbondata.query.expression.Expression
+import org.carbondata.spark.RawKey
+import org.carbondata.spark.load.CarbonLoaderUtil
+import org.carbondata.spark.util.QueryPlanUtil
+
+class CarbonSparkPartition(rddId: Int, val idx: Int,
+  val locations: Array[String],
+  val tableBlockInfos: util.List[TableBlockInfo])
+  extends Partition {
+
+  override val index: Int = idx
+
+  // val serializableHadoopSplit = new 
SerializableWritable[Array[String]](locations)
+  override def hashCode(): Int = {
+    41 * (41 + rddId) + idx
+  }
+}
+
+ /**
+  * This RDD is used to perform query on CarbonData file. Before sending tasks 
to scan
+  * CarbonData file, this RDD will leverage CarbonData's index information to 
do CarbonData file
+  * level filtering in driver side.
+  */
+class CarbonScanRDD[K, V](
+  sc: SparkContext,
+  queryModel: QueryModel,
+  filterExpression: Expression,
+  keyClass: RawKey[K, V],
+  @transient conf: Configuration,
+  cubeCreationTime: Long,
+  schemaLastUpdatedTime: Long,
+  baseStoreLocation: String)
+  extends RDD[(K, V)](sc, Nil) with Logging {
+
+  val defaultParallelism = sc.defaultParallelism
+
+  override def getPartitions: Array[Partition] = {
+    val startTime = System.currentTimeMillis()
+    val (carbonInputFormat: CarbonInputFormat[RowResult], job: Job) =
+      
QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
+
+    val result = new util.ArrayList[Partition](defaultParallelism)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    // set filter resolver tree
+    try {
+      var filterResolver = carbonInputFormat
+        .getResolvedFilter(job.getConfiguration, filterExpression)
+
+      CarbonInputFormat.setFilterPredicates(job.getConfiguration, 
filterResolver)
+      queryModel.setFilterExpressionResolverTree(filterResolver)
+    }
+    catch {
+      case e: Exception =>
+        LOGGER.error(e)
+        sys.error("Exception occurred in query execution :: " + e.getMessage)
+    }
+    // get splits
+    val splits = carbonInputFormat.getSplits(job)
+    if (!splits.isEmpty) {
+      val carbonInputSplits = 
splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+
+      val blockList = carbonInputSplits.map(inputSplit =>
+        new TableBlockInfo(inputSplit.getPath.toString,
+          inputSplit.getStart, inputSplit.getSegmentId,
+          inputSplit.getLocations, inputSplit.getLength
+        )
+      )
+      if (blockList.nonEmpty) {
+        // group blocks to nodes, tasks
+        val nodeBlockMapping =
+          CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, 
defaultParallelism)
+
+        var i = 0
+        // Create Spark Partition for each task and assign blocks
+        nodeBlockMapping.asScala.foreach { entry =>
+          entry._2.asScala.foreach { blocksPerTask =>
+            if (blocksPerTask.size() != 0) {
+              result.add(new CarbonSparkPartition(id, i, 
Seq(entry._1).toArray, blocksPerTask))
+              i += 1
+            }
+          }
+        }
+        val noOfBlocks = blockList.size
+        val noOfNodes = nodeBlockMapping.size
+        val noOfTasks = result.size()
+        logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
+          + s"parallelism: $defaultParallelism , " +
+          s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
+        )
+        logInfo("Time taken to identify Blocks to scan : " +
+          (System.currentTimeMillis() - startTime)
+        )
+        result.asScala.foreach { r =>
+          val cp = r.asInstanceOf[CarbonSparkPartition]
+          logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
+            + ", No.Of Blocks : " + cp.tableBlockInfos.size()
+          )
+        }
+      } else {
+        logInfo("No blocks identified to scan")
+        val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
+        result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, 
nodesPerBlock))
+      }
+    }
+    else {
+      logInfo("No valid segments found to scan")
+      val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
+      result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, 
nodesPerBlock))
+    }
+    result.toArray(new Array[Partition](result.size()))
+  }
+
+   override def compute(thepartition: Partition, context: TaskContext): 
Iterator[(K, V)] = {
+     val LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
+     val iter = new Iterator[(K, V)] {
+       var rowIterator: CarbonIterator[Array[Any]] = _
+       var queryStartTime: Long = 0
+       try {
+         val carbonSparkPartition = 
thepartition.asInstanceOf[CarbonSparkPartition]
+         if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
+           queryModel.setQueryId(queryModel.getQueryId() + "_" + 
carbonSparkPartition.idx)
+           // fill table block info
+           queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
+           queryStartTime = System.currentTimeMillis
+
+           val carbonPropertiesFilePath = 
System.getProperty("carbon.properties.filepath", null)
+           logInfo("*************************" + carbonPropertiesFilePath)
+           if (null == carbonPropertiesFilePath) {
+             System.setProperty("carbon.properties.filepath",
+               System.getProperty("user.dir") + '/' + "conf" + '/' + 
"carbon.properties");
+           }
+           // execute query
+           rowIterator = new ChunkRawRowIterartor(
+             
QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel)
+                 .asInstanceOf[CarbonIterator[BatchRawResult]])
+                 .asInstanceOf[CarbonIterator[Array[Any]]]
+         }
+       } catch {
+         case e: Exception =>
+           LOGGER.error(e)
+           if (null != e.getMessage) {
+             sys.error("Exception occurred in query execution :: " + 
e.getMessage)
+           } else {
+             sys.error("Exception occurred in query execution.Please check 
logs.")
+           }
+       }
+
+       var havePair = false
+       var finished = false
+
+       override def hasNext: Boolean = {
+         if (!finished && !havePair) {
+           finished = (null == rowIterator) || (!rowIterator.hasNext())
+           havePair = !finished
+         }
+         !finished
+       }
+
+       override def next(): (K, V) = {
+         if (!hasNext) {
+           throw new java.util.NoSuchElementException("End of stream")
+         }
+         havePair = false
+         val row = rowIterator.next()
+         keyClass.getKey(row, null)
+       }
+
+       logInfo("********************** Total Time Taken to execute the query 
in Carbon Side: " +
+           (System.currentTimeMillis - queryStartTime)
+       )
+     }
+     iter
+   }
+
+   /**
+    * Get the preferred locations where to launch this task.
+    */
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+    val theSplit = partition.asInstanceOf[CarbonSparkPartition]
+    theSplit.locations.filter(_ != "localhost")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a83dba34/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
 
b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
deleted file mode 100644
index 3943352..0000000
--- 
a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.carbondata.spark.testsuite.joinquery
-
-import org.apache.spark.sql.common.util.CarbonHiveContext._
-import org.apache.spark.sql.common.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.execution.joins.BroadCastFilterPushJoin
-
-class EquiJoinTestCase extends QueryTest with BeforeAndAfterAll  {
-   override def beforeAll {
-    //loading to hive table
-    sql("create table employee_hive (empid string,empname string,mobilename 
string,mobilecolor string,salary int)row format delimited fields terminated by 
','")
-    sql("create table mobile_hive (mobileid string,mobilename string, 
mobilecolor string, sales int)row format delimited fields terminated by ','");
-    sql("LOAD DATA LOCAL INPATH './src/test/resources/join/employee.csv' into 
table employee_hive")
-    sql("LOAD DATA LOCAL INPATH './src/test/resources/join/mobile.csv' into 
table mobile_hive")
-    //loading to carbon table
-    sql("create table employee (empid string,empname string,mobilename 
string,mobilecolor string,salary int) stored by 'org.apache.carbondata.format'")
-    sql("create table mobile (mobileid string,mobilename string, mobilecolor 
string, sales int) stored by 'org.apache.carbondata.format'");
-    sql("LOAD DATA LOCAL INPATH './src/test/resources/join/employee.csv' into 
table employee 
options('FILEHEADER'='empid,empname,mobilename,mobilecolor,salary')")
-    sql("LOAD DATA LOCAL INPATH './src/test/resources/join/mobile.csv' into 
table mobile options('FILEHEADER'='mobileid,mobilename,mobilecolor,sales')")
-   }
-   
-   test("test equijoin query") {
-     val df = sql("select employee.empname,mobile.mobilename from 
employee,mobile where employee.mobilename = mobile.mobilename")
-     var broadcastJoinExists = false
-     df.queryExecution.sparkPlan.collect {
-       case bcf: BroadCastFilterPushJoin =>
-         broadcastJoinExists = true
-     }
-     if (!broadcastJoinExists) {
-       assert(false)
-     }
-      checkAnswer(df,
-          sql("select employee_hive.empname,mobile_hive.mobilename from 
employee_hive,mobile_hive where employee_hive.mobilename = 
mobile_hive.mobilename"))
-  }
-  override def afterAll {
-    sql("drop table employee_hive")
-    sql("drop table mobile_hive")
-    sql("drop table employee")
-    sql("drop table mobile")
-  }
-}
\ No newline at end of file

Reply via email to