[CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version(Make API 
changes in carbon to be compatible with spark 2.3)

Fixed spark-carbon integration API compatibility issues
f) Corrected the testcases based on spark 2.3.0 behaviour change
g) Excluded following dependency from pom.xml files net.jpountzlz4 as spark 
2.3.0 changed
it to org.lz4, so removed from the test class path of 
spark2,spark-common-test,spark2-examples

This closes #2642


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/74c3eb10
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/74c3eb10
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/74c3eb10

Branch: refs/heads/master
Commit: 74c3eb10b860845dacd3542ab72595617dc0df57
Parents: bcef656
Author: sandeep-katta <sandeep.katta2...@gmail.com>
Authored: Thu Jul 5 21:31:29 2018 -0700
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Wed Sep 5 18:09:16 2018 +0530

----------------------------------------------------------------------
 datamap/mv/core/pom.xml                         |   5 +
 .../apache/carbondata/mv/datamap/MVHelper.scala |  44 ++-
 datamap/mv/plan/pom.xml                         |   5 +
 .../mv/plans/modular/ModularPlan.scala          |   3 +-
 .../mv/plans/modular/ModularRelation.scala      |   5 +-
 .../mv/plans/util/BirdcageOptimizer.scala       |  13 +-
 .../util/LogicalPlanSignatureGenerator.scala    |   5 +-
 .../mv/testutil/ModularPlanTest.scala           |  36 +--
 .../mv/plans/ExtractJoinConditionsSuite.scala   |   4 +-
 .../mv/plans/LogicalToModularPlanSuite.scala    |  18 +-
 examples/spark2/pom.xml                         |  20 ++
 integration/spark-common-test/pom.xml           |  32 +++
 .../testsuite/bigdecimal/TestBigDecimal.scala   |  10 +-
 .../filterexpr/AllDataTypesTestCaseFilter.scala |   7 +-
 .../StandardPartitionGlobalSortTestCase.scala   |  22 +-
 .../sql/commands/StoredAsCarbondataSuite.scala  |   3 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   2 +-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |  11 +-
 .../streaming/CarbonAppendableStreamSink.scala  |   2 +-
 .../apache/spark/sql/test/util/PlanTest.scala   |   5 -
 .../apache/spark/sql/util/SparkSQLUtil.scala    |  72 ++++-
 .../spark/util/CarbonReflectionUtils.scala      | 121 +++++---
 .../scala/org/apache/spark/util/SparkUtil.scala |  42 ---
 .../org/apache/spark/util/SparkUtilTest.scala   |  58 ++++
 integration/spark-datasource/pom.xml            | 132 +++++++++
 .../vectorreader/ColumnarVectorWrapper.java     |  88 +++---
 .../VectorizedCarbonRecordReader.java           |  20 +-
 .../scala/org/apache/spark/util/SparkUtil.scala |  60 ++++
 .../org/apache/spark/sql/CarbonVectorProxy.java | 272 ++++++++++++++++++
 .../org/apache/spark/sql/CarbonVectorProxy.java | 276 ++++++++++++++++++
 .../apache/spark/sql/ColumnVectorFactory.java   |  45 +++
 ...tCreateTableUsingSparkCarbonFileFormat.scala |  26 +-
 integration/spark2/pom.xml                      |  80 ++++++
 .../apache/spark/sql/hive/CarbonAnalyzer.scala  |  51 ++++
 .../sql/hive/CarbonInMemorySessionState.scala   | 278 +++++++++++++++++++
 .../spark/sql/hive/CarbonOptimizerUtil.scala    |  44 +++
 .../spark/sql/hive/CarbonSessionState.scala     | 269 ++++++++++++++++++
 .../spark/sql/hive/CarbonSessionUtil.scala      |  96 +++++++
 .../apache/spark/sql/hive/CarbonSqlConf.scala   | 148 ++++++++++
 ...CreateCarbonSourceTableAsSelectCommand.scala | 130 +++++++++
 .../spark/sql/hive/SqlAstBuilderHelper.scala    | 110 ++++++++
 .../stream/CarbonStreamRecordReader.java        | 155 ++++-------
 .../org/apache/spark/CarbonInputMetrics.scala   |   1 -
 .../spark/sql/CarbonCatalystOperators.scala     |  57 ++--
 .../org/apache/spark/sql/CarbonCountStar.scala  |  10 +-
 .../org/apache/spark/sql/CarbonSession.scala    |   7 +-
 .../sql/CustomDeterministicExpression.scala     |   2 -
 .../management/CarbonLoadDataCommand.scala      |  19 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |  82 ++++--
 .../sql/execution/strategy/DDLStrategy.scala    |  17 +-
 .../spark/sql/hive/CarbonAnalysisRules.scala    |  17 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  26 +-
 .../sql/hive/CarbonPreAggregateRules.scala      | 147 +++++++---
 .../spark/sql/optimizer/CarbonFilters.scala     |  26 +-
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  16 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   4 +-
 .../spark/sql/CarbonToSparkAdapater.scala       |  75 +++++
 .../org/apache/spark/sql/CarbonVectorProxy.java | 226 ---------------
 ...CreateCarbonSourceTableAsSelectCommand.scala |   2 -
 .../spark/sql/CarbonToSparkAdapater.scala       |  82 ++++++
 .../org/apache/spark/sql/CarbonVectorProxy.java | 229 ---------------
 .../apache/spark/sql/hive/CarbonAnalyzer.scala  |  51 ----
 .../sql/hive/CarbonInMemorySessionState.scala   | 278 -------------------
 .../apache/spark/sql/hive/CarbonOptimizer.scala |  44 +--
 .../spark/sql/hive/CarbonSessionState.scala     | 277 ------------------
 .../spark/sql/hive/CarbonSessionUtil.scala      |  97 -------
 .../spark/sql/hive/CarbonSqlAstBuilder.scala    |  85 +-----
 .../apache/spark/sql/hive/CarbonSqlConf.scala   | 148 ----------
 ...CreateCarbonSourceTableAsSelectCommand.scala | 122 --------
 .../spark/sql/CarbonToSparkAdapater.scala       |  84 ++++++
 .../org/apache/spark/sql/CarbonVectorProxy.java | 255 -----------------
 .../apache/spark/sql/ColumnVectorFactory.java   |  45 ---
 .../apache/spark/sql/hive/CarbonOptimizer.scala |  37 +++
 .../spark/sql/hive/CarbonSqlAstBuilder.scala    |  52 ++++
 .../stream/CarbonStreamRecordReaderTest.java    | 100 +++++++
 .../BooleanDataTypesFilterTest.scala            |   2 +-
 .../booleantype/BooleanDataTypesLoadTest.scala  |   4 +-
 .../bucketing/TableBucketingTestCase.scala      |  26 +-
 .../carbondata/query/SubQueryTestSuite.scala    |   1 -
 pom.xml                                         |  60 ++++
 .../scala/org/apache/spark/rpc/Master.scala     |   2 +-
 .../scala/org/apache/spark/rpc/RpcUtil.scala    |  56 ++++
 .../scala/org/apache/spark/rpc/Worker.scala     |   4 +-
 .../streaming/CarbonStreamInputFormat.java      |   2 +-
 .../streaming/CarbonStreamInputFormatTest.java  |  99 -------
 85 files changed, 3368 insertions(+), 2433 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/core/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/core/pom.xml b/datamap/mv/core/pom.xml
index 77b5cc7..f5e5715 100644
--- a/datamap/mv/core/pom.xml
+++ b/datamap/mv/core/pom.xml
@@ -50,6 +50,11 @@
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index f677826..9da109a 100644
--- 
a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ 
b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapater, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Cast, Expression, NamedExpression, ScalaUDF, SortOrder}
@@ -300,10 +300,13 @@ object MVHelper {
         case Alias(agg: AggregateExpression, name) =>
           agg.aggregateFunction.collect {
             case attr: AttributeReference =>
-              AttributeReference(attr.name, attr.dataType, attr.nullable, attr
-                .metadata)(attr.exprId,
+              CarbonToSparkAdapater.createAttributeReference(attr.name,
+                attr.dataType,
+                attr.nullable,
+                attr.metadata,
+                attr.exprId,
                 aliasName,
-                attr.isGenerated)
+                attr)
           }.head
         case Alias(child, name) =>
           child
@@ -314,18 +317,25 @@ object MVHelper {
     expressions.map {
         case alias@Alias(agg: AggregateExpression, name) =>
           attrMap.get(AttributeKey(agg)).map { exp =>
-            Alias(getAttribute(exp), name)(alias.exprId,
+            CarbonToSparkAdapater.createAliasRef(
+              getAttribute(exp),
+              name,
+              alias.exprId,
               alias.qualifier,
               alias.explicitMetadata,
-              alias.isGenerated)
+              Some(alias))
           }.getOrElse(alias)
 
         case attr: AttributeReference =>
           val uattr = attrMap.get(AttributeKey(attr)).map{a =>
             if (keepAlias) {
-              AttributeReference(a.name, a.dataType, a.nullable, 
a.metadata)(a.exprId,
+              CarbonToSparkAdapater.createAttributeReference(a.name,
+                a.dataType,
+                a.nullable,
+                a.metadata,
+                a.exprId,
                 attr.qualifier,
-                a.isGenerated)
+                a)
             } else {
               a
             }
@@ -333,10 +343,9 @@ object MVHelper {
           uattr
         case alias@Alias(expression: Expression, name) =>
           attrMap.get(AttributeKey(expression)).map { exp =>
-            Alias(getAttribute(exp), name)(alias.exprId,
-              alias.qualifier,
-              alias.explicitMetadata,
-              alias.isGenerated)
+            CarbonToSparkAdapater
+              .createAliasRef(getAttribute(exp), name, alias.exprId, 
alias.qualifier,
+                alias.explicitMetadata, Some(alias))
           }.getOrElse(alias)
         case expression: Expression =>
           val uattr = attrMap.get(AttributeKey(expression))
@@ -376,9 +385,14 @@ object MVHelper {
         case attr: AttributeReference =>
           val uattr = attrMap.get(AttributeKey(attr)).map{a =>
             if (keepAlias) {
-              AttributeReference(a.name, a.dataType, a.nullable, 
a.metadata)(a.exprId,
-                attr.qualifier,
-                a.isGenerated)
+              CarbonToSparkAdapater
+                .createAttributeReference(a.name,
+                  a.dataType,
+                  a.nullable,
+                  a.metadata,
+                  a.exprId,
+                  attr.qualifier,
+                  a)
             } else {
               a
             }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/pom.xml b/datamap/mv/plan/pom.xml
index fcf0e51..ff6976d 100644
--- a/datamap/mv/plan/pom.xml
+++ b/datamap/mv/plan/pom.xml
@@ -44,6 +44,11 @@
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
index 1420522..6c82598 100644
--- 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
+++ 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, 
QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.mv.plans._
 import org.apache.carbondata.mv.plans.util.{Printers, Signature, SQLBuilder}
@@ -64,7 +65,7 @@ abstract class ModularPlan
     //    spark.conf.set("spark.sql.cbo.enabled", true)
     val sqlStmt = asOneLineSQL
     val plan = spark.sql(sqlStmt).queryExecution.optimizedPlan
-    plan.stats(conf)
+    SparkSQLUtil.invokeStatsMethod(plan, conf)
   }
 
   override def fastEquals(other: TreeNode[_]): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
index 3349835..7e1eb05 100644
--- 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
+++ 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.mv.plans.modular.Flags._
 
@@ -45,7 +46,7 @@ case class ModularRelation(databaseName: String,
     rest: Seq[Seq[Any]]) extends LeafNode {
   override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
     val plan = spark.table(s"${ databaseName }.${ tableName 
}").queryExecution.optimizedPlan
-    val stats = plan.stats(conf)
+    val stats = SparkSQLUtil.invokeStatsMethod(plan, conf)
     val output = outputList.map(_.toAttribute)
     val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
       table => AttributeMap(table.output.zip(output))
@@ -136,7 +137,7 @@ case class HarmonizedRelation(source: ModularPlan) extends 
LeafNode {
   // (spark, conf)
   override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
     val plan = spark.table(s"${ databaseName }.${ tableName 
}").queryExecution.optimizedPlan
-    val stats = plan.stats(conf)
+    val stats = SparkSQLUtil.invokeStatsMethod(plan, conf)
     val output = 
source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation]
       .outputList.map(_.toAttribute)
     val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
index 6363089..e1e891a 100644
--- 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
+++ 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -17,12 +17,14 @@
 
 package org.apache.carbondata.mv.plans.util
 
+import org.apache.spark.sql.CarbonToSparkAdapater
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
 import org.apache.spark.sql.catalyst.rules.{RuleExecutor, _}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.util.SparkSQLUtil
 
 object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
 
@@ -74,8 +76,8 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
       "Operator Optimizations", fixedPoint, Seq(
         // Operator push down
         PushProjectionThroughUnion,
-        ReorderJoin(conf),
-        EliminateOuterJoin(conf),
+        SparkSQLUtil.getReorderJoinObj(conf),
+        SparkSQLUtil.getEliminateOuterJoinObj(conf),
         PushPredicateThroughJoin,
         PushDownPredicate,
         //      LimitPushDown(conf),
@@ -89,7 +91,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
         CombineLimits,
         CombineUnions,
         // Constant folding and strength reduction
-        NullPropagation(conf),
+        SparkSQLUtil.getNullPropagationObj(conf),
         FoldablePropagation,
         //      OptimizeIn(conf),
         ConstantFolding,
@@ -113,7 +115,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
                                             extendedOperatorOptimizationRules: 
_*) ::
     Batch(
       "Check Cartesian Products", Once,
-      CheckCartesianProducts(conf)) ::
+      SparkSQLUtil.getCheckCartesianProductsObj(conf)) ::
     //    Batch("Join Reorder", Once,
     //      CostBasedJoinReorder(conf)) ::
     //    Batch("Decimal Optimizations", fixedPoint,
@@ -126,8 +128,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
     //      ConvertToLocalRelation,
     //      PropagateEmptyRelation) ::
     Batch(
-      "OptimizeCodegen", Once,
-      OptimizeCodegen(conf)) ::
+      "OptimizeCodegen", Once, 
CarbonToSparkAdapater.getOptimizeCodegenRule(conf): _*) ::
     Batch(
       "RewriteSubquery", Once,
       RewritePredicateSubquery,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
index 2aff5c0..eaa8b04 100644
--- 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
+++ 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
@@ -30,7 +30,8 @@ object CheckSPJG {
       case a: Aggregate =>
         a.child.collect {
           case Join(_, _, _, _) | Project(_, _) | Filter(_, _) |
-               HiveTableRelation(_, _, _) | LogicalRelation(_, _, _) | 
LocalRelation(_, _) => true
+               HiveTableRelation(_, _, _) => true
+          case  l: LogicalRelation => true
           case _ => false
         }.forall(identity)
       case _ => false
@@ -55,7 +56,7 @@ object LogicalPlanRule extends SignatureRule[LogicalPlan] {
   def apply(plan: LogicalPlan, childSignatures: Seq[Option[Signature]]): 
Option[Signature] = {
 
     plan match {
-      case LogicalRelation(_, _, _) =>
+      case l: LogicalRelation =>
         // TODO: implement this (link to BaseRelation)
         None
       case HiveTableRelation(tableMeta, _, _) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
index 0c68a1f..9d4735c 100644
--- 
a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
+++ 
b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
@@ -17,13 +17,11 @@
 
 package org.apache.carbondata.mv.testutil
 
-import java.io.File
-
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.test.util.{PlanTest, QueryTest}
+import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.mv.plans.modular
 import org.apache.carbondata.mv.plans.modular.{ModularPlan, OneRowTable, 
Select}
@@ -56,30 +54,6 @@ abstract class ModularPlanTest extends QueryTest with 
PredicateHelper {
   }
 
   /**
-   * Normalizes plans:
-   * - Filter the filter conditions that appear in a plan. For instance,
-   * ((expr 1 && expr 2) && expr 3), (expr 1 && expr 2 && expr 3), (expr 3 && 
(expr 1 && expr 2)
-   *   etc., will all now be equivalent.
-   * - Sample the seed will replaced by 0L.
-   * - Join conditions will be resorted by hashCode.
-   */
-  protected def normalizePlan(plan: LogicalPlan): LogicalPlan = {
-    plan transform {
-      case filter@Filter(condition: Expression, child: LogicalPlan) =>
-        Filter(
-          
splitConjunctivePredicates(condition).map(rewriteEqual(_)).sortBy(_.hashCode())
-            .reduce(And), child)
-      case sample: Sample =>
-        sample.copy(seed = 0L)(true)
-      case join@Join(left, right, joinType, condition) if condition.isDefined 
=>
-        val newCondition =
-          
splitConjunctivePredicates(condition.get).map(rewriteEqual(_)).sortBy(_.hashCode())
-            .reduce(And)
-        Join(left, right, joinType, Some(newCondition))
-    }
-  }
-
-  /**
    * Rewrite [[EqualTo]] and [[EqualNullSafe]] operator to keep order. The 
following cases will be
    * equivalent:
    * 1. (a = b), (b = a);
@@ -177,4 +151,12 @@ abstract class ModularPlanTest extends QueryTest with 
PredicateHelper {
            """.stripMargin)
     }
   }
+
+  object MatchLocalRelation {
+    def unapply(localRelation: LocalRelation): Option[(Seq[Attribute], Any)] = 
localRelation match {
+      case l: LocalRelation => Some(l.output, l.data)
+      case _ => None
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala
 
b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala
index e1a3d9f..34ee2ed 100644
--- 
a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala
+++ 
b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala
@@ -39,7 +39,7 @@ class ExtractJoinConditionsSuite extends ModularPlanTest {
     val extracted = 
modularPlan.extractJoinConditions(modularPlan.children(0),modularPlan.children(1))
     
     val correctAnswer = originalQuery match {
-      case 
logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),Inner,Some(cond2))
 =>
+      case 
logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),Inner,Some(cond2))
 =>
         Seq(cond2)
     }
     
@@ -58,7 +58,7 @@ class ExtractJoinConditionsSuite extends ModularPlanTest {
       left.join(right,condition = Some("l.c".attr === "r.c".attr)).analyze
       
     val correctAnswer = originalQuery1 match {
-      case 
logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.Filter(cond2,logical.LocalRelation(tbl2,_)),Inner,Some(cond3))
 =>
+      case 
logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),logical.Filter(cond2,MatchLocalRelation(tbl2,_)),Inner,Some(cond3))
 =>
         Seq(cond3)
     }    
     

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala
----------------------------------------------------------------------
diff --git 
a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala
 
b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala
index 082c325..144721c 100644
--- 
a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala
+++ 
b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala
@@ -43,7 +43,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest {
         .analyze
     val modularized = originalQuery.modularize 
     val correctAnswer = originalQuery match {
-      case logical.Project(proj,logical.LocalRelation(tbl,_)) =>
+      case logical.Project(proj,MatchLocalRelation(tbl,_)) =>
         
ModularRelation(null,null,tbl,NoFlags,Seq.empty).select(proj:_*)(tbl:_*)()(Map.empty)()
     }
     comparePlans(modularized, correctAnswer)
@@ -58,7 +58,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest {
         
     val modularized = originalQuery.modularize
     val correctAnswer = originalQuery match {
-      case 
logical.Project(proj1,logical.Aggregate(grp,agg,logical.Project(proj2,logical.LocalRelation(tbl,_))))
 =>
+      case 
logical.Project(proj1,logical.Aggregate(grp,agg,logical.Project(proj2,MatchLocalRelation(tbl,_))))
 =>
         
ModularRelation(null,null,tbl,NoFlags,Seq.empty).select(proj2:_*)(tbl:_*)()(Map.empty)().groupBy(agg:_*)(proj2:_*)(grp:_*).select(proj1:_*)(proj1:_*)()(Map.empty)()
 
     }   
     comparePlans(modularized, correctAnswer)
@@ -73,7 +73,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest {
         
     val modularized = originalQuery.modularize
     val correctAnswer = originalQuery match {
-      case 
logical.Project(proj,logical.Filter(cond,logical.LocalRelation(tbl,_))) =>
+      case 
logical.Project(proj,logical.Filter(cond,MatchLocalRelation(tbl,_))) =>
         
ModularRelation(null,null,tbl,NoFlags,Seq.empty).select(proj:_*)(tbl:_*)(cond)(Map.empty)()
  
     }
     comparePlans(modularized, correctAnswer)
@@ -87,7 +87,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest {
       
     val modularized = originalQuery.modularize 
     val correctAnswer = originalQuery match {
-      case 
logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),Inner,Some(cond2))
 =>
+      case 
logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),Inner,Some(cond2))
 =>
         
Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond1,cond2):_*)(Map.empty)(JoinEdge(0,1,Inner))
 
     } 
     comparePlans(modularized, correctAnswer)
@@ -101,7 +101,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest {
       
     val modularized = originalQuery.modularize 
     val correctAnswer = originalQuery match {
-      case 
logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),LeftOuter,Some(cond2))
 =>
+      case 
logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),LeftOuter,Some(cond2))
 =>
         
Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond1,cond2):_*)(Map.empty)(JoinEdge(0,1,LeftOuter))
 
     } 
     comparePlans(modularized, correctAnswer)
@@ -115,7 +115,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest {
       
     val modularized = originalQuery.modularize 
     val correctAnswer = originalQuery match {
-      case 
logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),RightOuter,Some(cond2))
 =>
+      case 
logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),RightOuter,Some(cond2))
 =>
         
Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond1,cond2):_*)(Map.empty)(JoinEdge(0,1,RightOuter))
  
     } 
     comparePlans(modularized, correctAnswer)
@@ -129,7 +129,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest {
     
     val modularized = 
analysis.EliminateSubqueryAliases(originalQuery).modularize
     val correctAnswer = originalQuery match {
-      case 
logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),Inner,Some(cond2))
 =>
+      case 
logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),Inner,Some(cond2))
 =>
         
Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond1,cond2):_*)(Map.empty)(JoinEdge(0,1,Inner))
  
     } 
     comparePlans(modularized, correctAnswer)
@@ -147,7 +147,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest {
         
     val modularized = originalQuery.modularize
     val correctAnswer = originalQuery match {
-      case 
logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.Join(logical.Filter(cond2,logical.LocalRelation(tbl2,_)),logical.LocalRelation(tbl3,_),Inner,Some(cond3)),Inner,Some(cond4))
 =>
+      case 
logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),logical.Join(logical.Filter(cond2,MatchLocalRelation(tbl2,_)),MatchLocalRelation(tbl3,_),Inner,Some(cond3)),Inner,Some(cond4))
 =>
         
Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty),ModularRelation(null,null,tbl3,NoFlags,Seq.empty)).select(tbl1++tbl2++tbl3:_*)(tbl1++tbl2++tbl3:_*)(Seq(cond1,cond2,cond3,cond4):_*)(Map.empty)(JoinEdge(0,1,Inner),JoinEdge(1,2,Inner))
  
     } 
     comparePlans(modularized, correctAnswer)
@@ -165,7 +165,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest {
         
     val modularized = originalQuery.modularize
     val correctAnswer = originalQuery match {
-      case logical.Project(proj0, logical.Filter(cond1, logical.Project(proj1, 
logical.Aggregate(grp,agg,logical.Join(logical.Filter(cond2,logical.LocalRelation(tbl1,_)),logical.Filter(cond3,logical.LocalRelation(tbl2,_)),Inner,Some(cond4))))))
 =>
+      case logical.Project(proj0, logical.Filter(cond1, logical.Project(proj1, 
logical.Aggregate(grp,agg,logical.Join(logical.Filter(cond2,MatchLocalRelation(tbl1,_)),logical.Filter(cond3,MatchLocalRelation(tbl2,_)),Inner,Some(cond4))))))
 =>
         
Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond2,cond3,cond4):_*)(Map.empty)(JoinEdge(0,1,Inner)).groupBy(agg:_*)(tbl1++tbl2:_*)(grp:_*).select(proj0:_*)(proj1:_*)(cond1)(Map.empty)()
  
     }   
     comparePlans(modularized, correctAnswer)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/examples/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml
index 9d1adde..91cb20f 100644
--- a/examples/spark2/pom.xml
+++ b/examples/spark2/pom.xml
@@ -77,6 +77,26 @@
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <scope>test</scope>
     </dependency>
+    <!-- in spark 2.3 spark catalyst added dependency on spark-core-->
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-core</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <!-- need to Exclude net.jpountz jar from this project.
+         Spark has changed this jar to org.lz4:lz4-java
+         net.jpountz and org.lz4 has same class Name -->
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml 
b/integration/spark-common-test/pom.xml
index ee87d92..50e5830 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -127,12 +127,27 @@
       <artifactId>carbondata-lucene</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <!-- need to Exclude net.jpountz jar from this project.
+         Spark has changed this jar to org.lz4:lz4-java
+         net.jpountz and org.lz4 has same class Name -->
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-bloom</artifactId>
       <version>${project.version}</version>
       <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
@@ -146,6 +161,23 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <!-- spark catalyst added runtime dependency on spark-core,so
+      while executing the testcases spark-core should be present else it
+      will fail to execute -->
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <scope>test</scope>
+      <exclusions>
+        <!-- need to Exclude Avro jar from this project,spark core is using
+        the version 1.7.4 which is not compatible with Carbon -->
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
index 0dba467..1f7aafe 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
@@ -73,7 +73,6 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll 
{
     checkAnswer(sql("select min(salary) from carbonTable"),
       sql("select min(salary) from hiveTable"))
   }
-  
   test("test min datatype on big decimal column") {
     val output = sql("select min(salary) from 
carbonTable").collectAsList().get(0).get(0)
     assert(output.isInstanceOf[java.math.BigDecimal])
@@ -83,7 +82,6 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll 
{
     val output = sql("select max(salary) from 
carbonTable").collectAsList().get(0).get(0)
     assert(output.isInstanceOf[java.math.BigDecimal])
   }
-  
   test("test count function on big decimal column") {
     checkAnswer(sql("select count(salary) from carbonTable"),
       sql("select count(salary) from hiveTable"))
@@ -152,8 +150,8 @@ class TestBigDecimal extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test sum*10 aggregation on big decimal column with high precision") {
-    checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"),
-      sql("select sum(salary)*10 from hiveBigDecimal"))
+    checkAnswer(sql("select cast(sum(salary)*10 as double) from 
carbonBigDecimal_2"),
+      sql("select cast(sum(salary)*10 as double) from hiveBigDecimal"))
   }
 
   test("test sum/10 aggregation on big decimal column with high precision") {
@@ -167,8 +165,8 @@ class TestBigDecimal extends QueryTest with 
BeforeAndAfterAll {
   }
 
   test("test sum-distinct*10 aggregation on big decimal column with high 
precision") {
-    checkAnswer(sql("select sum(distinct(salary))*10 from carbonBigDecimal_2"),
-      sql("select sum(distinct(salary))*10 from hiveBigDecimal"))
+    checkAnswer(sql("select cast(sum(distinct(salary))*10 as decimal)from 
carbonBigDecimal_2"),
+      sql("select cast(sum(distinct(salary))*10 as decimal) from 
hiveBigDecimal"))
   }
 
   test("test sum-distinct/10 aggregation on big decimal column with high 
precision") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
index 15ac1f4..73786c8 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.spark.testsuite.filterexpr
 
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -56,13 +57,15 @@ class AllDataTypesTestCaseFilter extends QueryTest with 
BeforeAndAfterAll {
   test("verify like query ends with filter push down") {
     val df = sql("select * from alldatatypestableFilter where empname like 
'%nandh'").queryExecution
       .sparkPlan
-    assert(df.metadata.get("PushedFilters").get.contains("CarbonEndsWith"))
+    assert(df.asInstanceOf[CarbonDataSourceScan].metadata
+      .get("PushedFilters").get.contains("CarbonEndsWith"))
   }
 
   test("verify like query contains with filter push down") {
     val df = sql("select * from alldatatypestableFilter where empname like 
'%nand%'").queryExecution
       .sparkPlan
-    assert(df.metadata.get("PushedFilters").get.contains("CarbonContainsWith"))
+    assert(df.asInstanceOf[CarbonDataSourceScan].metadata
+      .get("PushedFilters").get.contains("CarbonContainsWith"))
   }
   
   override def afterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index 150bc36..564c5f3 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.{Callable, ExecutorService, 
Executors}
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.util.SparkUtil
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.constants.LoggerAction
@@ -859,11 +860,22 @@ class StandardPartitionGlobalSortTestCase extends 
QueryTest with BeforeAndAfterA
     sql("create table scalarissue_hive(a int,salary double) using parquet 
partitioned by (salary) ")
     sql("set hive.exec.dynamic.partition.mode=nonstrict")
     sql("insert into scalarissue_hive values(23,21.2)")
-    intercept[Exception] {
-      sql(s"select * from scalarissue_hive where salary = (select max(salary) 
from scalarissue_hive)").show()
-    }
-    intercept[Exception] {
-      sql(s"select * from scalarissue where salary = (select max(salary) from 
scalarissue)").show()
+    if (SparkUtil.isSparkVersionEqualTo("2.1") || 
SparkUtil.isSparkVersionEqualTo("2.2")) {
+      intercept[Exception] {
+        sql(s"select * from scalarissue_hive where salary = (select 
max(salary) from " +
+            s"scalarissue_hive)")
+          .show()
+      }
+      intercept[Exception] {
+        sql(s"select * from scalarissue where salary = (select max(salary) 
from scalarissue)")
+          .show()
+      }
+    } else {
+      checkAnswer(sql(s"select * from scalarissue_hive where salary = (select 
max(salary) from " +
+                      s"scalarissue_hive)"), Seq(Row(23, 21.2)))
+      checkAnswer(sql(s"select * from scalarissue where salary = (select 
max(salary) from " +
+                      s"scalarissue)"),
+        Seq(Row(23, 21.2)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala
index 2029e93..6400ed1 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala
@@ -87,7 +87,8 @@ class StoredAsCarbondataSuite extends QueryTest with 
BeforeAndAfterEach {
       sql("CREATE TABLE carbon_table(key INT, value STRING) STORED AS  ")
     } catch {
       case e: Exception =>
-        assert(e.getMessage.contains("no viable alternative at input"))
+        assert(e.getMessage.contains("no viable alternative at input") ||
+        e.getMessage.contains("mismatched input '<EOF>' expecting "))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index b712109..01c77f0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -421,7 +421,7 @@ class CarbonScanRDD[T: ClassTag](
           // create record reader for row format
           DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance())
           val inputFormat = new CarbonStreamInputFormat
-          inputFormat.setVectorReader(vectorReader)
+          inputFormat.setIsVectorReader(vectorReader)
           inputFormat.setInputMetricsStats(inputMetricsStats)
           model.setStatisticsRecorder(
             
CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 3cbfab2..ab6e320 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -49,13 +49,8 @@ import 
org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, 
CompactionType}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl}
-<<<<<<< 2f537b724f6f03ab40c95f7ecc8ebd38f6500099
 import org.apache.carbondata.spark.util.CommonUtil
-import org.apache.carbondata.streaming.{CarbonStreamInputFormat, 
CarbonStreamRecordReader}
-=======
-import org.apache.carbondata.spark.util.{CommonUtil, 
SparkDataTypeConverterImpl}
 import org.apache.carbondata.streaming.CarbonStreamInputFormat
->>>>>>> [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, 
ColumnVector Interface
 
 
 /**
@@ -167,11 +162,11 @@ class StreamHandoffRDD[K, V](
     val format = new CarbonTableInputFormat[Array[Object]]()
     val model = format.createQueryModel(inputSplit, attemptContext)
     val inputFormat = new CarbonStreamInputFormat
-    val streamReader = inputFormat.createRecordReader(inputSplit, 
attemptContext)
-      .asInstanceOf[RecordReader[Void, Any]]
-    inputFormat.setVectorReader(false)
+    inputFormat.setIsVectorReader(false)
     inputFormat.setModel(model)
     inputFormat.setUseRawRow(true)
+    val streamReader = inputFormat.createRecordReader(inputSplit, 
attemptContext)
+      .asInstanceOf[RecordReader[Void, Any]]
     streamReader.initialize(inputSplit, attemptContext)
     val iteratorList = new util.ArrayList[RawResultIterator](1)
     iteratorList.add(new StreamingRawResultIterator(streamReader))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 196baa6..5762906 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -121,7 +121,7 @@ class CarbonAppendableStreamSink(
         className = 
sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
         jobId = batchId.toString,
         outputPath = fileLogPath,
-        isAppend = false)
+        false)
 
       committer match {
         case manifestCommitter: ManifestFileCommitProtocol =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala
index 9883607..00156e9 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala
@@ -26,11 +26,6 @@ import org.apache.spark.sql.catalyst.util._
  */
 class PlanTest extends CarbonFunSuite {
 
-  /** Fails the test if the two expressions do not match */
-  protected def compareExpressions(e1: Expression, e2: Expression): Unit = {
-    comparePlans(Filter(e1, OneRowRelation), Filter(e2, OneRowRelation))
-  }
-
   /** Fails the test if the two plans do not match */
   protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
     val normalized1 = normalizeExprIds(plan1)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
index b5fda85..b7d47a0 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.sql.util
 
+import java.lang.reflect.Method
+
 import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.internal.SessionState
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 object SparkSQLUtil {
   def sessionState(sparkSession: SparkSession): SessionState = 
sparkSession.sessionState
@@ -31,4 +35,68 @@ object SparkSQLUtil {
   def getSparkSession: SparkSession = {
     SparkSession.getDefaultSession.get
   }
+
+  def invokeStatsMethod(logicalPlanObj: LogicalPlan, conf: SQLConf): 
Statistics = {
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val method: Method = logicalPlanObj.getClass.getMethod("stats", 
classOf[SQLConf])
+      method.invoke(logicalPlanObj, conf).asInstanceOf[Statistics]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val method: Method = logicalPlanObj.getClass.getMethod("stats")
+      method.invoke(logicalPlanObj).asInstanceOf[Statistics]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
+  def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin";
+      CarbonReflectionUtils.createObject(className, 
conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
+  def getEliminateOuterJoinObj(conf: SQLConf): Rule[LogicalPlan] = {
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val className = 
"org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin";
+      CarbonReflectionUtils.createObject(className, 
conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val className = 
"org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
+  def getNullPropagationObj(conf: SQLConf): Rule[LogicalPlan] = {
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val className = 
"org.apache.spark.sql.catalyst.optimizer.NullPropagation";
+      CarbonReflectionUtils.createObject(className, 
conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val className = 
"org.apache.spark.sql.catalyst.optimizer.NullPropagation$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
+  def getCheckCartesianProductsObj(conf: SQLConf): Rule[LogicalPlan] = {
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val className = 
"org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts";
+      CarbonReflectionUtils.createObject(className, 
conf)._1.asInstanceOf[Rule[LogicalPlan]]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      val className = 
"org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$";
+      CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1
+        .asInstanceOf[Rule[LogicalPlan]]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 34eeded..32cd201 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -17,24 +17,25 @@
 
 package org.apache.spark.util
 
-import java.lang.reflect.Field
+import java.lang.reflect.Method
 
 import scala.reflect.runtime._
 import scala.reflect.runtime.universe._
 
-import org.apache.spark.SPARK_VERSION
-import org.apache.spark.SparkContext
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.{SPARK_VERSION, SparkContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.parser.AstBuilder
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan, SubqueryAlias}
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.sources.BaseRelation
+import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
 
-import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
 /**
@@ -61,12 +62,12 @@ object CarbonReflectionUtils {
       tableIdentifier: TableIdentifier,
       tableAlias: Option[String] = None): UnresolvedRelation = {
     val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       createObject(
         className,
         tableIdentifier,
         tableAlias)._1.asInstanceOf[UnresolvedRelation]
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       createObject(
         className,
         tableIdentifier)._1.asInstanceOf[UnresolvedRelation]
@@ -79,13 +80,13 @@ object CarbonReflectionUtils {
       relation: LogicalPlan,
       view: Option[TableIdentifier]): SubqueryAlias = {
     val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias"
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       createObject(
         className,
         alias.getOrElse(""),
         relation,
         Option(view))._1.asInstanceOf[SubqueryAlias]
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       createObject(
         className,
         alias.getOrElse(""),
@@ -101,7 +102,7 @@ object CarbonReflectionUtils {
       overwrite: Boolean,
       ifPartitionNotExists: Boolean): InsertIntoTable = {
     val className = 
"org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable"
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       val overwriteOptions = createObject(
         "org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions",
         overwrite.asInstanceOf[Object], 
Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object]
@@ -112,7 +113,7 @@ object CarbonReflectionUtils {
         query,
         overwriteOptions,
         
ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable]
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2") ) {
       createObject(
         className,
         table,
@@ -127,20 +128,28 @@ object CarbonReflectionUtils {
 
   def getLogicalRelation(relation: BaseRelation,
       expectedOutputAttributes: Seq[Attribute],
-      catalogTable: Option[CatalogTable]): LogicalRelation = {
+      catalogTable: Option[CatalogTable],
+      isStreaming: Boolean): LogicalRelation = {
     val className = 
"org.apache.spark.sql.execution.datasources.LogicalRelation"
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       createObject(
         className,
         relation,
         Some(expectedOutputAttributes),
         catalogTable)._1.asInstanceOf[LogicalRelation]
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionEqualTo("2.2")) {
       createObject(
         className,
         relation,
         expectedOutputAttributes,
         catalogTable)._1.asInstanceOf[LogicalRelation]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+      createObject(
+        className,
+        relation,
+        expectedOutputAttributes,
+        catalogTable,
+        isStreaming.asInstanceOf[Object])._1.asInstanceOf[LogicalRelation]
     } else {
       throw new UnsupportedOperationException("Unsupported Spark version")
     }
@@ -183,27 +192,23 @@ object CarbonReflectionUtils {
   def getAstBuilder(conf: Object,
       sqlParser: Object,
       sparkSession: SparkSession): AstBuilder = {
-    if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) {
-      val className = sparkSession.sparkContext.conf.get(
-        CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME,
-        "org.apache.spark.sql.hive.CarbonSqlAstBuilder")
-      createObject(className,
-        conf,
-        sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
-    } else {
-      throw new UnsupportedOperationException("Spark version not supported")
-    }
+    val className = sparkSession.sparkContext.conf.get(
+      CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME,
+      "org.apache.spark.sql.hive.CarbonSqlAstBuilder")
+    createObject(className,
+      conf,
+      sqlParser, sparkSession)._1.asInstanceOf[AstBuilder]
   }
 
   def getSessionState(sparkContext: SparkContext,
       carbonSession: Object,
       useHiveMetaStore: Boolean): Any = {
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       val className = sparkContext.conf.get(
         CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
         "org.apache.spark.sql.hive.CarbonSessionState")
       createObject(className, carbonSession)._1
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       if (useHiveMetaStore) {
         val className = sparkContext.conf.get(
           CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME,
@@ -225,12 +230,12 @@ object CarbonReflectionUtils {
   }
 
   def hasPredicateSubquery(filterExp: Expression) : Boolean = {
-    if (SPARK_VERSION.startsWith("2.1")) {
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
       val tuple = 
Class.forName("org.apache.spark.sql.catalyst.expressions.PredicateSubquery")
       val method = tuple.getMethod("hasPredicateSubquery", classOf[Expression])
       val hasSubquery : Boolean = method.invoke(tuple, 
filterExp).asInstanceOf[Boolean]
       hasSubquery
-    } else if (SPARK_VERSION.startsWith("2.2")) {
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
       val tuple = 
Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression")
       val method = tuple.getMethod("hasInOrExistsSubquery", 
classOf[Expression])
       val hasSubquery : Boolean = method.invoke(tuple, 
filterExp).asInstanceOf[Boolean]
@@ -248,6 +253,54 @@ object CarbonReflectionUtils {
     isFormatted
   }
 
+  def getRowDataSourceScanExecObj(relation: LogicalRelation,
+      output: Seq[Attribute],
+      pushedFilters: Seq[Filter],
+      handledFilters: Seq[Filter],
+      rdd: RDD[InternalRow],
+      partition: Partitioning,
+      metadata: Map[String, String]): RowDataSourceScanExec = {
+    val className = "org.apache.spark.sql.execution.RowDataSourceScanExec"
+    if (SparkUtil.isSparkVersionEqualTo("2.1") || 
SparkUtil.isSparkVersionEqualTo("2.2")) {
+      createObject(className, output, rdd, relation.relation,
+        partition, metadata,
+        
relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
+    } else if (SparkUtil.isSparkVersionXandAbove("2.3")) {
+      createObject(className, output, output.map(output.indexOf),
+        pushedFilters.toSet, handledFilters.toSet, rdd,
+        relation.relation,
+        
relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
+  def invokewriteAndReadMethod(dataSourceObj: DataSource,
+      dataFrame: DataFrame,
+      data: LogicalPlan,
+      session: SparkSession,
+      mode: SaveMode,
+      query: LogicalPlan,
+      physicalPlan: SparkPlan): BaseRelation = {
+    if (SparkUtil.isSparkVersionEqualTo("2.2")) {
+      val method: Method = dataSourceObj.getClass
+        .getMethod("writeAndRead", classOf[SaveMode], classOf[DataFrame])
+      method.invoke(dataSourceObj, mode, dataFrame)
+        .asInstanceOf[BaseRelation]
+    } else if (SparkUtil.isSparkVersionEqualTo("2.3")) {
+      val method: Method = dataSourceObj.getClass
+        .getMethod("writeAndRead",
+          classOf[SaveMode],
+          classOf[LogicalPlan],
+          classOf[Seq[Attribute]],
+          classOf[SparkPlan])
+      method.invoke(dataSourceObj, mode, query, query.output, physicalPlan)
+        .asInstanceOf[BaseRelation]
+    } else {
+      throw new UnsupportedOperationException("Spark version not supported")
+    }
+  }
+
   def createObject(className: String, conArgs: Object*): (Any, Class[_]) = {
     val clazz = Utils.classForName(className)
     val ctor = clazz.getConstructors.head
@@ -255,4 +308,10 @@ object CarbonReflectionUtils {
     (ctor.newInstance(conArgs: _*), clazz)
   }
 
+  def createObjectOfPrivateConstructor(className: String, conArgs: Object*): 
(Any, Class[_]) = {
+    val clazz = Utils.classForName(className)
+    val ctor = clazz.getDeclaredConstructors.head
+    ctor.setAccessible(true)
+    (ctor.newInstance(conArgs: _*), clazz)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala 
b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
deleted file mode 100644
index 4635fc7..0000000
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.spark.{SparkContext, TaskContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD}
-
-import org.apache.carbondata.processing.loading.csvinput.BlockDetails
-
-/*
- * this object use to handle file splits
- */
-object SparkUtil {
-
-  def setTaskContext(context: TaskContext): Unit = {
-    val localThreadContext = TaskContext.get()
-    if (localThreadContext == null) {
-      TaskContext.setTaskContext(context)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala
 
b/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala
new file mode 100644
index 0000000..4810db1
--- /dev/null
+++ 
b/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.apache.spark.SPARK_VERSION
+import org.scalatest.FunSuite
+
+class SparkUtilTest extends FunSuite{
+
+  test("Test Spark Version API with X and Above") {
+    if (SPARK_VERSION.startsWith("2.1")) {
+      assert(SparkUtil.isSparkVersionXandAbove("2.1"))
+      assert(!SparkUtil.isSparkVersionXandAbove("2.2"))
+      assert(!SparkUtil.isSparkVersionXandAbove("2.3"))
+    } else if (SPARK_VERSION.startsWith("2.2")) {
+      assert(SparkUtil.isSparkVersionXandAbove("2.1"))
+      assert(SparkUtil.isSparkVersionXandAbove("2.2"))
+      assert(!SparkUtil.isSparkVersionXandAbove("2.3"))
+    } else {
+      assert(SparkUtil.isSparkVersionXandAbove("2.1"))
+      assert(SparkUtil.isSparkVersionXandAbove("2.2"))
+      assert(SparkUtil.isSparkVersionXandAbove("2.3"))
+      assert(!SparkUtil.isSparkVersionXandAbove("2.4"))
+    }
+  }
+
+  test("Test Spark Version API Equal to X") {
+    if (SPARK_VERSION.startsWith("2.1")) {
+      assert(SparkUtil.isSparkVersionEqualTo("2.1"))
+      assert(!SparkUtil.isSparkVersionEqualTo("2.2"))
+      assert(!SparkUtil.isSparkVersionEqualTo("2.3"))
+    } else if (SPARK_VERSION.startsWith("2.2")) {
+      assert(!SparkUtil.isSparkVersionEqualTo("2.1"))
+      assert(SparkUtil.isSparkVersionEqualTo("2.2"))
+      assert(!SparkUtil.isSparkVersionEqualTo("2.3"))
+    } else {
+      assert(!SparkUtil.isSparkVersionEqualTo("2.1"))
+      assert(!SparkUtil.isSparkVersionEqualTo("2.2"))
+      assert(SparkUtil.isSparkVersionEqualTo("2.3"))
+      assert(!SparkUtil.isSparkVersionEqualTo("2.4"))
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/pom.xml 
b/integration/spark-datasource/pom.xml
index 38cf629..3d017c8 100644
--- a/integration/spark-datasource/pom.xml
+++ b/integration/spark-datasource/pom.xml
@@ -39,6 +39,15 @@
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-hadoop</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <!-- need to Exclude net.jpountz jar from this project.
+         Spark has changed this jar to org.lz4:lz4-java
+         net.jpountz and org.lz4 has same class Name -->
+        <exclusion>
+          <groupId>net.jpountz.lz4</groupId>
+          <artifactId>lz4</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
@@ -192,5 +201,128 @@
         <maven.test.skip>true</maven.test.skip>
       </properties>
     </profile>
+    <profile>
+      <id>spark-2.1</id>
+      <properties>
+        <spark.version>2.1.0</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.3plus</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.1andspark2.2</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.2</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <properties>
+        <spark.version>2.2.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.3plus</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.1andspark2.2</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-2.3</id>
+      <properties>
+        <spark.version>2.3.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>src/main/spark2.1andspark2.2</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <version>3.0.0</version>
+            <executions>
+              <execution>
+                <id>add-source</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>src/main/spark2.3plus</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
 
b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 7bab117..bc5a387 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types.Decimal;
 
 class ColumnarVectorWrapper implements CarbonColumnVector {
 
-  private CarbonVectorProxy writableColumnVector;
+  private CarbonVectorProxy sparkColumnVectorProxy;
 
   private boolean[] filteredRows;
 
@@ -38,32 +38,36 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   private int ordinal;
 
+  private boolean isDictionary;
+
   private boolean filteredRowsExist;
 
   private DataType blockDataType;
 
+  private CarbonColumnVector dictionaryVector;
+
   ColumnarVectorWrapper(CarbonVectorProxy writableColumnVector,
-                        boolean[] filteredRows, int ordinal) {
-    this.writableColumnVector = writableColumnVector;
+      boolean[] filteredRows, int ordinal) {
+    this.sparkColumnVectorProxy = writableColumnVector;
     this.filteredRows = filteredRows;
     this.ordinal = ordinal;
   }
 
   @Override public void putBoolean(int rowId, boolean value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putBoolean(counter++, value, ordinal);
+      sparkColumnVectorProxy.putBoolean(counter++, value, ordinal);
     }
   }
 
   @Override public void putFloat(int rowId, float value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putFloat(counter++, value,ordinal);
+      sparkColumnVectorProxy.putFloat(counter++, value,ordinal);
     }
   }
 
   @Override public void putShort(int rowId, short value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putShort(counter++, value, ordinal);
+      sparkColumnVectorProxy.putShort(counter++, value, ordinal);
     }
   }
 
@@ -71,18 +75,22 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putShort(counter++, value, ordinal);
+          sparkColumnVectorProxy.putShort(counter++, value, ordinal);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putShorts(rowId, count, value, ordinal);
+      sparkColumnVectorProxy.putShorts(rowId, count, value, ordinal);
     }
   }
 
   @Override public void putInt(int rowId, int value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putInt(counter++, value, ordinal);
+      if (isDictionary) {
+        sparkColumnVectorProxy.putDictionaryInt(counter++, value, ordinal);
+      } else {
+        sparkColumnVectorProxy.putInt(counter++, value, ordinal);
+      }
     }
   }
 
@@ -90,18 +98,18 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putInt(counter++, value, ordinal);
+          sparkColumnVectorProxy.putInt(counter++, value, ordinal);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putInts(rowId, count, value, ordinal);
+      sparkColumnVectorProxy.putInts(rowId, count, value, ordinal);
     }
   }
 
   @Override public void putLong(int rowId, long value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putLong(counter++, value, ordinal);
+      sparkColumnVectorProxy.putLong(counter++, value, ordinal);
     }
   }
 
@@ -109,19 +117,19 @@ class ColumnarVectorWrapper implements CarbonColumnVector 
{
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putLong(counter++, value, ordinal);
+          sparkColumnVectorProxy.putLong(counter++, value, ordinal);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putLongs(rowId, count, value, ordinal);
+      sparkColumnVectorProxy.putLongs(rowId, count, value, ordinal);
     }
   }
 
   @Override public void putDecimal(int rowId, BigDecimal value, int precision) 
{
     if (!filteredRows[rowId]) {
       Decimal toDecimal = Decimal.apply(value);
-      writableColumnVector.putDecimal(counter++, toDecimal, precision, 
ordinal);
+      sparkColumnVectorProxy.putDecimal(counter++, toDecimal, precision, 
ordinal);
     }
   }
 
@@ -129,7 +137,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
     Decimal decimal = Decimal.apply(value);
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
-        writableColumnVector.putDecimal(counter++, decimal, precision, 
ordinal);
+        sparkColumnVectorProxy.putDecimal(counter++, decimal, precision, 
ordinal);
       }
       rowId++;
     }
@@ -137,7 +145,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   @Override public void putDouble(int rowId, double value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putDouble(counter++, value, ordinal);
+      sparkColumnVectorProxy.putDouble(counter++, value, ordinal);
     }
   }
 
@@ -145,25 +153,25 @@ class ColumnarVectorWrapper implements CarbonColumnVector 
{
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putDouble(counter++, value, ordinal);
+          sparkColumnVectorProxy.putDouble(counter++, value, ordinal);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putDoubles(rowId, count, value, ordinal);
+      sparkColumnVectorProxy.putDoubles(rowId, count, value, ordinal);
     }
   }
 
   @Override public void putBytes(int rowId, byte[] value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putByteArray(counter++, value, ordinal);
+      sparkColumnVectorProxy.putByteArray(counter++, value, ordinal);
     }
   }
 
   @Override public void putBytes(int rowId, int count, byte[] value) {
     for (int i = 0; i < count; i++) {
       if (!filteredRows[rowId]) {
-        writableColumnVector.putByteArray(counter++, value, ordinal);
+        sparkColumnVectorProxy.putByteArray(counter++, value, ordinal);
       }
       rowId++;
     }
@@ -171,13 +179,13 @@ class ColumnarVectorWrapper implements CarbonColumnVector 
{
 
   @Override public void putBytes(int rowId, int offset, int length, byte[] 
value) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putByteArray(counter++, value, offset, length, 
ordinal);
+      sparkColumnVectorProxy.putByteArray(counter++, value, offset, length, 
ordinal);
     }
   }
 
   @Override public void putNull(int rowId) {
     if (!filteredRows[rowId]) {
-      writableColumnVector.putNull(counter++, ordinal);
+      sparkColumnVectorProxy.putNull(counter++, ordinal);
     }
   }
 
@@ -185,18 +193,18 @@ class ColumnarVectorWrapper implements CarbonColumnVector 
{
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          writableColumnVector.putNull(counter++, ordinal);
+          sparkColumnVectorProxy.putNull(counter++, ordinal);
         }
         rowId++;
       }
     } else {
-      writableColumnVector.putNulls(rowId, count,ordinal);
+      sparkColumnVectorProxy.putNulls(rowId, count,ordinal);
     }
   }
 
   @Override public void putNotNull(int rowId) {
     if (!filteredRows[rowId]) {
-      columnVector.putNotNull(counter++);
+      sparkColumnVectorProxy.putNotNull(counter++,ordinal);
     }
   }
 
@@ -204,17 +212,17 @@ class ColumnarVectorWrapper implements CarbonColumnVector 
{
     if (filteredRowsExist) {
       for (int i = 0; i < count; i++) {
         if (!filteredRows[rowId]) {
-          columnVector.putNotNull(counter++);
+          sparkColumnVectorProxy.putNotNull(counter++, ordinal);
         }
         rowId++;
       }
     } else {
-      columnVector.putNotNulls(rowId, count);
+      sparkColumnVectorProxy.putNotNulls(rowId, count, ordinal);
     }
   }
 
   @Override public boolean isNull(int rowId) {
-    return writableColumnVector.isNullAt(rowId,ordinal);
+    return sparkColumnVectorProxy.isNullAt(rowId,ordinal);
   }
 
   @Override public void putObject(int rowId, Object obj) {
@@ -236,7 +244,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector {
 
   @Override public DataType getType() {
     return CarbonSparkDataSourceUtil
-        .convertSparkToCarbonDataType(writableColumnVector.dataType(ordinal));
+        
.convertSparkToCarbonDataType(sparkColumnVectorProxy.dataType(ordinal));
   }
 
   @Override
@@ -249,20 +257,32 @@ class ColumnarVectorWrapper implements CarbonColumnVector 
{
     this.blockDataType = blockDataType;
   }
 
-  @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+  @Override
+  public void setFilteredRowsExist(boolean filteredRowsExist) {
     this.filteredRowsExist = filteredRowsExist;
   }
 
   @Override public void setDictionary(CarbonDictionary dictionary) {
     if (dictionary == null) {
-      columnVector.setDictionary(null);
+      sparkColumnVectorProxy.setDictionary(null, ordinal);
     } else {
-      columnVector.setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, 
dictionary));
+      sparkColumnVectorProxy
+          .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, 
dictionary),ordinal);
     }
   }
 
+  private void  setDictionaryType(boolean type) {
+    this.isDictionary = type;
+  }
+
   @Override public boolean hasDictionary() {
-    return columnVector.hasDictionary();
+    return sparkColumnVectorProxy.hasDictionary(ordinal);
+  }
+
+  public void reserveDictionaryIds() {
+    
sparkColumnVectorProxy.reserveDictionaryIds(sparkColumnVectorProxy.numRows(), 
ordinal);
+    dictionaryVector = new ColumnarVectorWrapper(sparkColumnVectorProxy, 
filteredRows, ordinal);
+    ((ColumnarVectorWrapper) dictionaryVector).isDictionary = true;
   }
 
   @Override public CarbonColumnVector getDictionaryVector() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
 
b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 7c98608..d6b6b7e 100644
--- 
a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ 
b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -51,16 +51,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.spark.memory.MemoryMode;
-<<<<<<< 
2f537b724f6f03ab40c95f7ecc8ebd38f6500099:integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
 import 
org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
-import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-=======
 import org.apache.spark.sql.CarbonVectorProxy;
->>>>>>> [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, 
ColumnVector 
Interface:integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
 import org.apache.spark.sql.types.DecimalType;
 import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 
 /**
  * A specialized RecordReader that reads into InternalRows or ColumnarBatches 
directly using the
@@ -282,22 +279,23 @@ public class VectorizedCarbonRecordReader extends 
AbstractRecordReader<Object> {
         schema = schema.add(field);
       }
     }
-    columnarBatch = ColumnarBatch.allocate(schema, memMode);
+    vectorProxy = new 
CarbonVectorProxy(DEFAULT_MEMORY_MODE,schema,DEFAULT_BATCH_SIZE);
     if (partitionColumns != null) {
       int partitionIdx = fields.length;
       for (int i = 0; i < partitionColumns.fields().length; i++) {
-        ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), 
partitionValues, i);
-        columnarBatch.column(i + partitionIdx).setIsConstant();
+        ColumnVectorUtils.populate(vectorProxy.column(i + partitionIdx), 
partitionValues, i);
+        vectorProxy.column(i + partitionIdx).setIsConstant();
       }
     }
-    vectorProxy = new 
CarbonVectorProxy(MemoryMode.OFF_HEAP,DEFAULT_BATCH_SIZE,fields);
     CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
     boolean[] filteredRows = new boolean[vectorProxy.numRows()];
     for (int i = 0; i < fields.length; i++) {
-    if (isNoDictStringField[i]) {
-      vectorProxy.reserveDictionaryIds(vectorProxy.numRows(), i);
-      }
       vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i);
+      if (isNoDictStringField[i]) {
+        if (vectors[i] instanceof ColumnarVectorWrapper) {
+          ((ColumnarVectorWrapper) vectors[i]).reserveDictionaryIds();
+        }
+      }
     }
     carbonColumnarBatch = new CarbonColumnarBatch(vectors, 
vectorProxy.numRows(), filteredRows);
   }

Reply via email to