[CARBONDATA-2532][Integration] Carbon to support spark 2.3.1 version(Make API changes in carbon to be compatible with spark 2.3)
Fixed spark-carbon integration API compatibility issues f) Corrected the testcases based on spark 2.3.0 behaviour change g) Excluded following dependency from pom.xml files net.jpountzlz4 as spark 2.3.0 changed it to org.lz4, so removed from the test class path of spark2,spark-common-test,spark2-examples This closes #2642 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/74c3eb10 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/74c3eb10 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/74c3eb10 Branch: refs/heads/master Commit: 74c3eb10b860845dacd3542ab72595617dc0df57 Parents: bcef656 Author: sandeep-katta <sandeep.katta2...@gmail.com> Authored: Thu Jul 5 21:31:29 2018 -0700 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Sep 5 18:09:16 2018 +0530 ---------------------------------------------------------------------- datamap/mv/core/pom.xml | 5 + .../apache/carbondata/mv/datamap/MVHelper.scala | 44 ++- datamap/mv/plan/pom.xml | 5 + .../mv/plans/modular/ModularPlan.scala | 3 +- .../mv/plans/modular/ModularRelation.scala | 5 +- .../mv/plans/util/BirdcageOptimizer.scala | 13 +- .../util/LogicalPlanSignatureGenerator.scala | 5 +- .../mv/testutil/ModularPlanTest.scala | 36 +-- .../mv/plans/ExtractJoinConditionsSuite.scala | 4 +- .../mv/plans/LogicalToModularPlanSuite.scala | 18 +- examples/spark2/pom.xml | 20 ++ integration/spark-common-test/pom.xml | 32 +++ .../testsuite/bigdecimal/TestBigDecimal.scala | 10 +- .../filterexpr/AllDataTypesTestCaseFilter.scala | 7 +- .../StandardPartitionGlobalSortTestCase.scala | 22 +- .../sql/commands/StoredAsCarbondataSuite.scala | 3 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 2 +- .../carbondata/spark/rdd/StreamHandoffRDD.scala | 11 +- .../streaming/CarbonAppendableStreamSink.scala | 2 +- .../apache/spark/sql/test/util/PlanTest.scala | 5 - .../apache/spark/sql/util/SparkSQLUtil.scala | 72 ++++- .../spark/util/CarbonReflectionUtils.scala | 121 +++++--- .../scala/org/apache/spark/util/SparkUtil.scala | 42 --- .../org/apache/spark/util/SparkUtilTest.scala | 58 ++++ integration/spark-datasource/pom.xml | 132 +++++++++ .../vectorreader/ColumnarVectorWrapper.java | 88 +++--- .../VectorizedCarbonRecordReader.java | 20 +- .../scala/org/apache/spark/util/SparkUtil.scala | 60 ++++ .../org/apache/spark/sql/CarbonVectorProxy.java | 272 ++++++++++++++++++ .../org/apache/spark/sql/CarbonVectorProxy.java | 276 ++++++++++++++++++ .../apache/spark/sql/ColumnVectorFactory.java | 45 +++ ...tCreateTableUsingSparkCarbonFileFormat.scala | 26 +- integration/spark2/pom.xml | 80 ++++++ .../apache/spark/sql/hive/CarbonAnalyzer.scala | 51 ++++ .../sql/hive/CarbonInMemorySessionState.scala | 278 +++++++++++++++++++ .../spark/sql/hive/CarbonOptimizerUtil.scala | 44 +++ .../spark/sql/hive/CarbonSessionState.scala | 269 ++++++++++++++++++ .../spark/sql/hive/CarbonSessionUtil.scala | 96 +++++++ .../apache/spark/sql/hive/CarbonSqlConf.scala | 148 ++++++++++ ...CreateCarbonSourceTableAsSelectCommand.scala | 130 +++++++++ .../spark/sql/hive/SqlAstBuilderHelper.scala | 110 ++++++++ .../stream/CarbonStreamRecordReader.java | 155 ++++------- .../org/apache/spark/CarbonInputMetrics.scala | 1 - .../spark/sql/CarbonCatalystOperators.scala | 57 ++-- .../org/apache/spark/sql/CarbonCountStar.scala | 10 +- .../org/apache/spark/sql/CarbonSession.scala | 7 +- .../sql/CustomDeterministicExpression.scala | 2 - .../management/CarbonLoadDataCommand.scala | 19 +- .../strategy/CarbonLateDecodeStrategy.scala | 82 ++++-- .../sql/execution/strategy/DDLStrategy.scala | 17 +- .../spark/sql/hive/CarbonAnalysisRules.scala | 17 +- .../spark/sql/hive/CarbonFileMetastore.scala | 26 +- .../sql/hive/CarbonPreAggregateRules.scala | 147 +++++++--- .../spark/sql/optimizer/CarbonFilters.scala | 26 +- .../sql/optimizer/CarbonLateDecodeRule.scala | 16 +- .../sql/parser/CarbonSpark2SqlParser.scala | 4 +- .../spark/sql/CarbonToSparkAdapater.scala | 75 +++++ .../org/apache/spark/sql/CarbonVectorProxy.java | 226 --------------- ...CreateCarbonSourceTableAsSelectCommand.scala | 2 - .../spark/sql/CarbonToSparkAdapater.scala | 82 ++++++ .../org/apache/spark/sql/CarbonVectorProxy.java | 229 --------------- .../apache/spark/sql/hive/CarbonAnalyzer.scala | 51 ---- .../sql/hive/CarbonInMemorySessionState.scala | 278 ------------------- .../apache/spark/sql/hive/CarbonOptimizer.scala | 44 +-- .../spark/sql/hive/CarbonSessionState.scala | 277 ------------------ .../spark/sql/hive/CarbonSessionUtil.scala | 97 ------- .../spark/sql/hive/CarbonSqlAstBuilder.scala | 85 +----- .../apache/spark/sql/hive/CarbonSqlConf.scala | 148 ---------- ...CreateCarbonSourceTableAsSelectCommand.scala | 122 -------- .../spark/sql/CarbonToSparkAdapater.scala | 84 ++++++ .../org/apache/spark/sql/CarbonVectorProxy.java | 255 ----------------- .../apache/spark/sql/ColumnVectorFactory.java | 45 --- .../apache/spark/sql/hive/CarbonOptimizer.scala | 37 +++ .../spark/sql/hive/CarbonSqlAstBuilder.scala | 52 ++++ .../stream/CarbonStreamRecordReaderTest.java | 100 +++++++ .../BooleanDataTypesFilterTest.scala | 2 +- .../booleantype/BooleanDataTypesLoadTest.scala | 4 +- .../bucketing/TableBucketingTestCase.scala | 26 +- .../carbondata/query/SubQueryTestSuite.scala | 1 - pom.xml | 60 ++++ .../scala/org/apache/spark/rpc/Master.scala | 2 +- .../scala/org/apache/spark/rpc/RpcUtil.scala | 56 ++++ .../scala/org/apache/spark/rpc/Worker.scala | 4 +- .../streaming/CarbonStreamInputFormat.java | 2 +- .../streaming/CarbonStreamInputFormatTest.java | 99 ------- 85 files changed, 3368 insertions(+), 2433 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/core/pom.xml ---------------------------------------------------------------------- diff --git a/datamap/mv/core/pom.xml b/datamap/mv/core/pom.xml index 77b5cc7..f5e5715 100644 --- a/datamap/mv/core/pom.xml +++ b/datamap/mv/core/pom.xml @@ -50,6 +50,11 @@ <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala index f677826..9da109a 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.spark.sql.{CarbonEnv, SparkSession} +import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapater, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF, SortOrder} @@ -300,10 +300,13 @@ object MVHelper { case Alias(agg: AggregateExpression, name) => agg.aggregateFunction.collect { case attr: AttributeReference => - AttributeReference(attr.name, attr.dataType, attr.nullable, attr - .metadata)(attr.exprId, + CarbonToSparkAdapater.createAttributeReference(attr.name, + attr.dataType, + attr.nullable, + attr.metadata, + attr.exprId, aliasName, - attr.isGenerated) + attr) }.head case Alias(child, name) => child @@ -314,18 +317,25 @@ object MVHelper { expressions.map { case alias@Alias(agg: AggregateExpression, name) => attrMap.get(AttributeKey(agg)).map { exp => - Alias(getAttribute(exp), name)(alias.exprId, + CarbonToSparkAdapater.createAliasRef( + getAttribute(exp), + name, + alias.exprId, alias.qualifier, alias.explicitMetadata, - alias.isGenerated) + Some(alias)) }.getOrElse(alias) case attr: AttributeReference => val uattr = attrMap.get(AttributeKey(attr)).map{a => if (keepAlias) { - AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId, + CarbonToSparkAdapater.createAttributeReference(a.name, + a.dataType, + a.nullable, + a.metadata, + a.exprId, attr.qualifier, - a.isGenerated) + a) } else { a } @@ -333,10 +343,9 @@ object MVHelper { uattr case alias@Alias(expression: Expression, name) => attrMap.get(AttributeKey(expression)).map { exp => - Alias(getAttribute(exp), name)(alias.exprId, - alias.qualifier, - alias.explicitMetadata, - alias.isGenerated) + CarbonToSparkAdapater + .createAliasRef(getAttribute(exp), name, alias.exprId, alias.qualifier, + alias.explicitMetadata, Some(alias)) }.getOrElse(alias) case expression: Expression => val uattr = attrMap.get(AttributeKey(expression)) @@ -376,9 +385,14 @@ object MVHelper { case attr: AttributeReference => val uattr = attrMap.get(AttributeKey(attr)).map{a => if (keepAlias) { - AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId, - attr.qualifier, - a.isGenerated) + CarbonToSparkAdapater + .createAttributeReference(a.name, + a.dataType, + a.nullable, + a.metadata, + a.exprId, + attr.qualifier, + a) } else { a } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/pom.xml ---------------------------------------------------------------------- diff --git a/datamap/mv/plan/pom.xml b/datamap/mv/plan/pom.xml index fcf0e51..ff6976d 100644 --- a/datamap/mv/plan/pom.xml +++ b/datamap/mv/plan/pom.xml @@ -44,6 +44,11 @@ <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala index 1420522..6c82598 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan} import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.mv.plans._ import org.apache.carbondata.mv.plans.util.{Printers, Signature, SQLBuilder} @@ -64,7 +65,7 @@ abstract class ModularPlan // spark.conf.set("spark.sql.cbo.enabled", true) val sqlStmt = asOneLineSQL val plan = spark.sql(sqlStmt).queryExecution.optimizedPlan - plan.stats(conf) + SparkSQLUtil.invokeStatsMethod(plan, conf) } override def fastEquals(other: TreeNode[_]): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala index 3349835..7e1eb05 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.mv.plans.modular.Flags._ @@ -45,7 +46,7 @@ case class ModularRelation(databaseName: String, rest: Seq[Seq[Any]]) extends LeafNode { override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = { val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan - val stats = plan.stats(conf) + val stats = SparkSQLUtil.invokeStatsMethod(plan, conf) val output = outputList.map(_.toAttribute) val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { table => AttributeMap(table.output.zip(output)) @@ -136,7 +137,7 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode { // (spark, conf) override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = { val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan - val stats = plan.stats(conf) + val stats = SparkSQLUtil.invokeStatsMethod(plan, conf) val output = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation] .outputList.map(_.toAttribute) val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala index 6363089..e1e891a 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala @@ -17,12 +17,14 @@ package org.apache.carbondata.mv.plans.util +import org.apache.spark.sql.CarbonToSparkAdapater import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} import org.apache.spark.sql.catalyst.rules.{RuleExecutor, _} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.util.SparkSQLUtil object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { @@ -74,8 +76,8 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { "Operator Optimizations", fixedPoint, Seq( // Operator push down PushProjectionThroughUnion, - ReorderJoin(conf), - EliminateOuterJoin(conf), + SparkSQLUtil.getReorderJoinObj(conf), + SparkSQLUtil.getEliminateOuterJoinObj(conf), PushPredicateThroughJoin, PushDownPredicate, // LimitPushDown(conf), @@ -89,7 +91,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { CombineLimits, CombineUnions, // Constant folding and strength reduction - NullPropagation(conf), + SparkSQLUtil.getNullPropagationObj(conf), FoldablePropagation, // OptimizeIn(conf), ConstantFolding, @@ -113,7 +115,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { extendedOperatorOptimizationRules: _*) :: Batch( "Check Cartesian Products", Once, - CheckCartesianProducts(conf)) :: + SparkSQLUtil.getCheckCartesianProductsObj(conf)) :: // Batch("Join Reorder", Once, // CostBasedJoinReorder(conf)) :: // Batch("Decimal Optimizations", fixedPoint, @@ -126,8 +128,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { // ConvertToLocalRelation, // PropagateEmptyRelation) :: Batch( - "OptimizeCodegen", Once, - OptimizeCodegen(conf)) :: + "OptimizeCodegen", Once, CarbonToSparkAdapater.getOptimizeCodegenRule(conf): _*) :: Batch( "RewriteSubquery", Once, RewritePredicateSubquery, http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala index 2aff5c0..eaa8b04 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala @@ -30,7 +30,8 @@ object CheckSPJG { case a: Aggregate => a.child.collect { case Join(_, _, _, _) | Project(_, _) | Filter(_, _) | - HiveTableRelation(_, _, _) | LogicalRelation(_, _, _) | LocalRelation(_, _) => true + HiveTableRelation(_, _, _) => true + case l: LogicalRelation => true case _ => false }.forall(identity) case _ => false @@ -55,7 +56,7 @@ object LogicalPlanRule extends SignatureRule[LogicalPlan] { def apply(plan: LogicalPlan, childSignatures: Seq[Option[Signature]]): Option[Signature] = { plan match { - case LogicalRelation(_, _, _) => + case l: LogicalRelation => // TODO: implement this (link to BaseRelation) None case HiveTableRelation(tableMeta, _, _) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala index 0c68a1f..9d4735c 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala @@ -17,13 +17,11 @@ package org.apache.carbondata.mv.testutil -import java.io.File - import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util._ -import org.apache.spark.sql.test.util.{PlanTest, QueryTest} +import org.apache.spark.sql.test.util.QueryTest import org.apache.carbondata.mv.plans.modular import org.apache.carbondata.mv.plans.modular.{ModularPlan, OneRowTable, Select} @@ -56,30 +54,6 @@ abstract class ModularPlanTest extends QueryTest with PredicateHelper { } /** - * Normalizes plans: - * - Filter the filter conditions that appear in a plan. For instance, - * ((expr 1 && expr 2) && expr 3), (expr 1 && expr 2 && expr 3), (expr 3 && (expr 1 && expr 2) - * etc., will all now be equivalent. - * - Sample the seed will replaced by 0L. - * - Join conditions will be resorted by hashCode. - */ - protected def normalizePlan(plan: LogicalPlan): LogicalPlan = { - plan transform { - case filter@Filter(condition: Expression, child: LogicalPlan) => - Filter( - splitConjunctivePredicates(condition).map(rewriteEqual(_)).sortBy(_.hashCode()) - .reduce(And), child) - case sample: Sample => - sample.copy(seed = 0L)(true) - case join@Join(left, right, joinType, condition) if condition.isDefined => - val newCondition = - splitConjunctivePredicates(condition.get).map(rewriteEqual(_)).sortBy(_.hashCode()) - .reduce(And) - Join(left, right, joinType, Some(newCondition)) - } - } - - /** * Rewrite [[EqualTo]] and [[EqualNullSafe]] operator to keep order. The following cases will be * equivalent: * 1. (a = b), (b = a); @@ -177,4 +151,12 @@ abstract class ModularPlanTest extends QueryTest with PredicateHelper { """.stripMargin) } } + + object MatchLocalRelation { + def unapply(localRelation: LocalRelation): Option[(Seq[Attribute], Any)] = localRelation match { + case l: LocalRelation => Some(l.output, l.data) + case _ => None + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala index e1a3d9f..34ee2ed 100644 --- a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala +++ b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ExtractJoinConditionsSuite.scala @@ -39,7 +39,7 @@ class ExtractJoinConditionsSuite extends ModularPlanTest { val extracted = modularPlan.extractJoinConditions(modularPlan.children(0),modularPlan.children(1)) val correctAnswer = originalQuery match { - case logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),Inner,Some(cond2)) => + case logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),Inner,Some(cond2)) => Seq(cond2) } @@ -58,7 +58,7 @@ class ExtractJoinConditionsSuite extends ModularPlanTest { left.join(right,condition = Some("l.c".attr === "r.c".attr)).analyze val correctAnswer = originalQuery1 match { - case logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.Filter(cond2,logical.LocalRelation(tbl2,_)),Inner,Some(cond3)) => + case logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),logical.Filter(cond2,MatchLocalRelation(tbl2,_)),Inner,Some(cond3)) => Seq(cond3) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala ---------------------------------------------------------------------- diff --git a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala index 082c325..144721c 100644 --- a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala +++ b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/LogicalToModularPlanSuite.scala @@ -43,7 +43,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest { .analyze val modularized = originalQuery.modularize val correctAnswer = originalQuery match { - case logical.Project(proj,logical.LocalRelation(tbl,_)) => + case logical.Project(proj,MatchLocalRelation(tbl,_)) => ModularRelation(null,null,tbl,NoFlags,Seq.empty).select(proj:_*)(tbl:_*)()(Map.empty)() } comparePlans(modularized, correctAnswer) @@ -58,7 +58,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest { val modularized = originalQuery.modularize val correctAnswer = originalQuery match { - case logical.Project(proj1,logical.Aggregate(grp,agg,logical.Project(proj2,logical.LocalRelation(tbl,_)))) => + case logical.Project(proj1,logical.Aggregate(grp,agg,logical.Project(proj2,MatchLocalRelation(tbl,_)))) => ModularRelation(null,null,tbl,NoFlags,Seq.empty).select(proj2:_*)(tbl:_*)()(Map.empty)().groupBy(agg:_*)(proj2:_*)(grp:_*).select(proj1:_*)(proj1:_*)()(Map.empty)() } comparePlans(modularized, correctAnswer) @@ -73,7 +73,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest { val modularized = originalQuery.modularize val correctAnswer = originalQuery match { - case logical.Project(proj,logical.Filter(cond,logical.LocalRelation(tbl,_))) => + case logical.Project(proj,logical.Filter(cond,MatchLocalRelation(tbl,_))) => ModularRelation(null,null,tbl,NoFlags,Seq.empty).select(proj:_*)(tbl:_*)(cond)(Map.empty)() } comparePlans(modularized, correctAnswer) @@ -87,7 +87,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest { val modularized = originalQuery.modularize val correctAnswer = originalQuery match { - case logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),Inner,Some(cond2)) => + case logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),Inner,Some(cond2)) => Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond1,cond2):_*)(Map.empty)(JoinEdge(0,1,Inner)) } comparePlans(modularized, correctAnswer) @@ -101,7 +101,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest { val modularized = originalQuery.modularize val correctAnswer = originalQuery match { - case logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),LeftOuter,Some(cond2)) => + case logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),LeftOuter,Some(cond2)) => Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond1,cond2):_*)(Map.empty)(JoinEdge(0,1,LeftOuter)) } comparePlans(modularized, correctAnswer) @@ -115,7 +115,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest { val modularized = originalQuery.modularize val correctAnswer = originalQuery match { - case logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),RightOuter,Some(cond2)) => + case logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),RightOuter,Some(cond2)) => Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond1,cond2):_*)(Map.empty)(JoinEdge(0,1,RightOuter)) } comparePlans(modularized, correctAnswer) @@ -129,7 +129,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest { val modularized = analysis.EliminateSubqueryAliases(originalQuery).modularize val correctAnswer = originalQuery match { - case logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.LocalRelation(tbl2,_),Inner,Some(cond2)) => + case logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),MatchLocalRelation(tbl2,_),Inner,Some(cond2)) => Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond1,cond2):_*)(Map.empty)(JoinEdge(0,1,Inner)) } comparePlans(modularized, correctAnswer) @@ -147,7 +147,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest { val modularized = originalQuery.modularize val correctAnswer = originalQuery match { - case logical.Join(logical.Filter(cond1,logical.LocalRelation(tbl1,_)),logical.Join(logical.Filter(cond2,logical.LocalRelation(tbl2,_)),logical.LocalRelation(tbl3,_),Inner,Some(cond3)),Inner,Some(cond4)) => + case logical.Join(logical.Filter(cond1,MatchLocalRelation(tbl1,_)),logical.Join(logical.Filter(cond2,MatchLocalRelation(tbl2,_)),MatchLocalRelation(tbl3,_),Inner,Some(cond3)),Inner,Some(cond4)) => Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty),ModularRelation(null,null,tbl3,NoFlags,Seq.empty)).select(tbl1++tbl2++tbl3:_*)(tbl1++tbl2++tbl3:_*)(Seq(cond1,cond2,cond3,cond4):_*)(Map.empty)(JoinEdge(0,1,Inner),JoinEdge(1,2,Inner)) } comparePlans(modularized, correctAnswer) @@ -165,7 +165,7 @@ class LogicalToModularPlanSuite extends ModularPlanTest { val modularized = originalQuery.modularize val correctAnswer = originalQuery match { - case logical.Project(proj0, logical.Filter(cond1, logical.Project(proj1, logical.Aggregate(grp,agg,logical.Join(logical.Filter(cond2,logical.LocalRelation(tbl1,_)),logical.Filter(cond3,logical.LocalRelation(tbl2,_)),Inner,Some(cond4)))))) => + case logical.Project(proj0, logical.Filter(cond1, logical.Project(proj1, logical.Aggregate(grp,agg,logical.Join(logical.Filter(cond2,MatchLocalRelation(tbl1,_)),logical.Filter(cond3,MatchLocalRelation(tbl2,_)),Inner,Some(cond4)))))) => Seq(ModularRelation(null,null,tbl1,NoFlags,Seq.empty),ModularRelation(null,null,tbl2,NoFlags,Seq.empty)).select(tbl1++tbl2:_*)(tbl1++tbl2:_*)(Seq(cond2,cond3,cond4):_*)(Map.empty)(JoinEdge(0,1,Inner)).groupBy(agg:_*)(tbl1++tbl2:_*)(grp:_*).select(proj0:_*)(proj1:_*)(cond1)(Map.empty)() } comparePlans(modularized, correctAnswer) http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/examples/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/examples/spark2/pom.xml b/examples/spark2/pom.xml index 9d1adde..91cb20f 100644 --- a/examples/spark2/pom.xml +++ b/examples/spark2/pom.xml @@ -77,6 +77,26 @@ <artifactId>scalatest_${scala.binary.version}</artifactId> <scope>test</scope> </dependency> + <!-- in spark 2.3 spark catalyst added dependency on spark-core--> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + </dependency> + <dependency> + <groupId>org.apache.carbondata</groupId> + <artifactId>carbondata-core</artifactId> + <version>${project.version}</version> + <exclusions> + <!-- need to Exclude net.jpountz jar from this project. + Spark has changed this jar to org.lz4:lz4-java + net.jpountz and org.lz4 has same class Name --> + <exclusion> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml index ee87d92..50e5830 100644 --- a/integration/spark-common-test/pom.xml +++ b/integration/spark-common-test/pom.xml @@ -127,12 +127,27 @@ <artifactId>carbondata-lucene</artifactId> <version>${project.version}</version> <scope>test</scope> + <exclusions> + <!-- need to Exclude net.jpountz jar from this project. + Spark has changed this jar to org.lz4:lz4-java + net.jpountz and org.lz4 has same class Name --> + <exclusion> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-bloom</artifactId> <version>${project.version}</version> <scope>test</scope> + <exclusions> + <exclusion> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> @@ -146,6 +161,23 @@ <scope>test</scope> </dependency> <dependency> + <!-- spark catalyst added runtime dependency on spark-core,so + while executing the testcases spark-core should be present else it + will fail to execute --> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${spark.version}</version> + <scope>test</scope> + <exclusions> + <!-- need to Exclude Avro jar from this project,spark core is using + the version 1.7.4 which is not compatible with Carbon --> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala index 0dba467..1f7aafe 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala @@ -73,7 +73,6 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select min(salary) from carbonTable"), sql("select min(salary) from hiveTable")) } - test("test min datatype on big decimal column") { val output = sql("select min(salary) from carbonTable").collectAsList().get(0).get(0) assert(output.isInstanceOf[java.math.BigDecimal]) @@ -83,7 +82,6 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll { val output = sql("select max(salary) from carbonTable").collectAsList().get(0).get(0) assert(output.isInstanceOf[java.math.BigDecimal]) } - test("test count function on big decimal column") { checkAnswer(sql("select count(salary) from carbonTable"), sql("select count(salary) from hiveTable")) @@ -152,8 +150,8 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll { } test("test sum*10 aggregation on big decimal column with high precision") { - checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"), - sql("select sum(salary)*10 from hiveBigDecimal")) + checkAnswer(sql("select cast(sum(salary)*10 as double) from carbonBigDecimal_2"), + sql("select cast(sum(salary)*10 as double) from hiveBigDecimal")) } test("test sum/10 aggregation on big decimal column with high precision") { @@ -167,8 +165,8 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll { } test("test sum-distinct*10 aggregation on big decimal column with high precision") { - checkAnswer(sql("select sum(distinct(salary))*10 from carbonBigDecimal_2"), - sql("select sum(distinct(salary))*10 from hiveBigDecimal")) + checkAnswer(sql("select cast(sum(distinct(salary))*10 as decimal)from carbonBigDecimal_2"), + sql("select cast(sum(distinct(salary))*10 as decimal) from hiveBigDecimal")) } test("test sum-distinct/10 aggregation on big decimal column with high precision") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala index 15ac1f4..73786c8 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/filterexpr/AllDataTypesTestCaseFilter.scala @@ -17,6 +17,7 @@ package org.apache.carbondata.spark.testsuite.filterexpr +import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -56,13 +57,15 @@ class AllDataTypesTestCaseFilter extends QueryTest with BeforeAndAfterAll { test("verify like query ends with filter push down") { val df = sql("select * from alldatatypestableFilter where empname like '%nandh'").queryExecution .sparkPlan - assert(df.metadata.get("PushedFilters").get.contains("CarbonEndsWith")) + assert(df.asInstanceOf[CarbonDataSourceScan].metadata + .get("PushedFilters").get.contains("CarbonEndsWith")) } test("verify like query contains with filter push down") { val df = sql("select * from alldatatypestableFilter where empname like '%nand%'").queryExecution .sparkPlan - assert(df.metadata.get("PushedFilters").get.contains("CarbonContainsWith")) + assert(df.asInstanceOf[CarbonDataSourceScan].metadata + .get("PushedFilters").get.contains("CarbonContainsWith")) } override def afterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala index 150bc36..564c5f3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala @@ -21,6 +21,7 @@ import java.util.concurrent.{Callable, ExecutorService, Executors} import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest +import org.apache.spark.util.SparkUtil import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.constants.LoggerAction @@ -859,11 +860,22 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA sql("create table scalarissue_hive(a int,salary double) using parquet partitioned by (salary) ") sql("set hive.exec.dynamic.partition.mode=nonstrict") sql("insert into scalarissue_hive values(23,21.2)") - intercept[Exception] { - sql(s"select * from scalarissue_hive where salary = (select max(salary) from scalarissue_hive)").show() - } - intercept[Exception] { - sql(s"select * from scalarissue where salary = (select max(salary) from scalarissue)").show() + if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) { + intercept[Exception] { + sql(s"select * from scalarissue_hive where salary = (select max(salary) from " + + s"scalarissue_hive)") + .show() + } + intercept[Exception] { + sql(s"select * from scalarissue where salary = (select max(salary) from scalarissue)") + .show() + } + } else { + checkAnswer(sql(s"select * from scalarissue_hive where salary = (select max(salary) from " + + s"scalarissue_hive)"), Seq(Row(23, 21.2))) + checkAnswer(sql(s"select * from scalarissue where salary = (select max(salary) from " + + s"scalarissue)"), + Seq(Row(23, 21.2))) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala index 2029e93..6400ed1 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/sql/commands/StoredAsCarbondataSuite.scala @@ -87,7 +87,8 @@ class StoredAsCarbondataSuite extends QueryTest with BeforeAndAfterEach { sql("CREATE TABLE carbon_table(key INT, value STRING) STORED AS ") } catch { case e: Exception => - assert(e.getMessage.contains("no viable alternative at input")) + assert(e.getMessage.contains("no viable alternative at input") || + e.getMessage.contains("mismatched input '<EOF>' expecting ")) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index b712109..01c77f0 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -421,7 +421,7 @@ class CarbonScanRDD[T: ClassTag]( // create record reader for row format DataTypeUtil.setDataTypeConverter(dataTypeConverterClz.newInstance()) val inputFormat = new CarbonStreamInputFormat - inputFormat.setVectorReader(vectorReader) + inputFormat.setIsVectorReader(vectorReader) inputFormat.setInputMetricsStats(inputMetricsStats) model.setStatisticsRecorder( CarbonTimeStatisticsFactory.createExecutorRecorder(model.getQueryId)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala index 3cbfab2..ab6e320 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala @@ -49,13 +49,8 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType} import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl} -<<<<<<< 2f537b724f6f03ab40c95f7ecc8ebd38f6500099 import org.apache.carbondata.spark.util.CommonUtil -import org.apache.carbondata.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} -======= -import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl} import org.apache.carbondata.streaming.CarbonStreamInputFormat ->>>>>>> [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, ColumnVector Interface /** @@ -167,11 +162,11 @@ class StreamHandoffRDD[K, V]( val format = new CarbonTableInputFormat[Array[Object]]() val model = format.createQueryModel(inputSplit, attemptContext) val inputFormat = new CarbonStreamInputFormat - val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) - .asInstanceOf[RecordReader[Void, Any]] - inputFormat.setVectorReader(false) + inputFormat.setIsVectorReader(false) inputFormat.setModel(model) inputFormat.setUseRawRow(true) + val streamReader = inputFormat.createRecordReader(inputSplit, attemptContext) + .asInstanceOf[RecordReader[Void, Any]] streamReader.initialize(inputSplit, attemptContext) val iteratorList = new util.ArrayList[RawResultIterator](1) iteratorList.add(new StreamingRawResultIterator(streamReader)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 196baa6..5762906 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -121,7 +121,7 @@ class CarbonAppendableStreamSink( className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass, jobId = batchId.toString, outputPath = fileLogPath, - isAppend = false) + false) committer match { case manifestCommitter: ManifestFileCommitProtocol => http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala index 9883607..00156e9 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/PlanTest.scala @@ -26,11 +26,6 @@ import org.apache.spark.sql.catalyst.util._ */ class PlanTest extends CarbonFunSuite { - /** Fails the test if the two expressions do not match */ - protected def compareExpressions(e1: Expression, e2: Expression): Unit = { - comparePlans(Filter(e1, OneRowRelation), Filter(e2, OneRowRelation)) - } - /** Fails the test if the two plans do not match */ protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) { val normalized1 = normalizeExprIds(plan1) http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala index b5fda85..b7d47a0 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.util +import java.lang.reflect.Method + import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.internal.{SessionState, SQLConf} +import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil} object SparkSQLUtil { def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState @@ -31,4 +35,68 @@ object SparkSQLUtil { def getSparkSession: SparkSession = { SparkSession.getDefaultSession.get } + + def invokeStatsMethod(logicalPlanObj: LogicalPlan, conf: SQLConf): Statistics = { + if (SparkUtil.isSparkVersionEqualTo("2.2")) { + val method: Method = logicalPlanObj.getClass.getMethod("stats", classOf[SQLConf]) + method.invoke(logicalPlanObj, conf).asInstanceOf[Statistics] + } else if (SparkUtil.isSparkVersionXandAbove("2.3")) { + val method: Method = logicalPlanObj.getClass.getMethod("stats") + method.invoke(logicalPlanObj).asInstanceOf[Statistics] + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } + + def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = { + if (SparkUtil.isSparkVersionEqualTo("2.2")) { + val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin"; + CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]] + } else if (SparkUtil.isSparkVersionXandAbove("2.3")) { + val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$"; + CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 + .asInstanceOf[Rule[LogicalPlan]] + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } + + def getEliminateOuterJoinObj(conf: SQLConf): Rule[LogicalPlan] = { + if (SparkUtil.isSparkVersionEqualTo("2.2")) { + val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin"; + CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]] + } else if (SparkUtil.isSparkVersionXandAbove("2.3")) { + val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$"; + CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 + .asInstanceOf[Rule[LogicalPlan]] + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } + + def getNullPropagationObj(conf: SQLConf): Rule[LogicalPlan] = { + if (SparkUtil.isSparkVersionEqualTo("2.2")) { + val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation"; + CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]] + } else if (SparkUtil.isSparkVersionXandAbove("2.3")) { + val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$"; + CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 + .asInstanceOf[Rule[LogicalPlan]] + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } + + def getCheckCartesianProductsObj(conf: SQLConf): Rule[LogicalPlan] = { + if (SparkUtil.isSparkVersionEqualTo("2.2")) { + val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts"; + CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]] + } else if (SparkUtil.isSparkVersionXandAbove("2.3")) { + val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$"; + CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 + .asInstanceOf[Rule[LogicalPlan]] + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index 34eeded..32cd201 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -17,24 +17,25 @@ package org.apache.spark.util -import java.lang.reflect.Field +import java.lang.reflect.Method import scala.reflect.runtime._ import scala.reflect.runtime.universe._ -import org.apache.spark.SPARK_VERSION -import org.apache.spark.SparkContext -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.{SPARK_VERSION, SparkContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.parser.AstBuilder import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation} +import org.apache.spark.sql.sources.{BaseRelation, Filter} -import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants /** @@ -61,12 +62,12 @@ object CarbonReflectionUtils { tableIdentifier: TableIdentifier, tableAlias: Option[String] = None): UnresolvedRelation = { val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation" - if (SPARK_VERSION.startsWith("2.1")) { + if (SparkUtil.isSparkVersionEqualTo("2.1")) { createObject( className, tableIdentifier, tableAlias)._1.asInstanceOf[UnresolvedRelation] - } else if (SPARK_VERSION.startsWith("2.2")) { + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { createObject( className, tableIdentifier)._1.asInstanceOf[UnresolvedRelation] @@ -79,13 +80,13 @@ object CarbonReflectionUtils { relation: LogicalPlan, view: Option[TableIdentifier]): SubqueryAlias = { val className = "org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias" - if (SPARK_VERSION.startsWith("2.1")) { + if (SparkUtil.isSparkVersionEqualTo("2.1")) { createObject( className, alias.getOrElse(""), relation, Option(view))._1.asInstanceOf[SubqueryAlias] - } else if (SPARK_VERSION.startsWith("2.2")) { + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { createObject( className, alias.getOrElse(""), @@ -101,7 +102,7 @@ object CarbonReflectionUtils { overwrite: Boolean, ifPartitionNotExists: Boolean): InsertIntoTable = { val className = "org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable" - if (SPARK_VERSION.startsWith("2.1")) { + if (SparkUtil.isSparkVersionEqualTo("2.1")) { val overwriteOptions = createObject( "org.apache.spark.sql.catalyst.plans.logical.OverwriteOptions", overwrite.asInstanceOf[Object], Map.empty.asInstanceOf[Object])._1.asInstanceOf[Object] @@ -112,7 +113,7 @@ object CarbonReflectionUtils { query, overwriteOptions, ifPartitionNotExists.asInstanceOf[Object])._1.asInstanceOf[InsertIntoTable] - } else if (SPARK_VERSION.startsWith("2.2")) { + } else if (SparkUtil.isSparkVersionXandAbove("2.2") ) { createObject( className, table, @@ -127,20 +128,28 @@ object CarbonReflectionUtils { def getLogicalRelation(relation: BaseRelation, expectedOutputAttributes: Seq[Attribute], - catalogTable: Option[CatalogTable]): LogicalRelation = { + catalogTable: Option[CatalogTable], + isStreaming: Boolean): LogicalRelation = { val className = "org.apache.spark.sql.execution.datasources.LogicalRelation" - if (SPARK_VERSION.startsWith("2.1")) { + if (SparkUtil.isSparkVersionEqualTo("2.1")) { createObject( className, relation, Some(expectedOutputAttributes), catalogTable)._1.asInstanceOf[LogicalRelation] - } else if (SPARK_VERSION.startsWith("2.2")) { + } else if (SparkUtil.isSparkVersionEqualTo("2.2")) { createObject( className, relation, expectedOutputAttributes, catalogTable)._1.asInstanceOf[LogicalRelation] + } else if (SparkUtil.isSparkVersionEqualTo("2.3")) { + createObject( + className, + relation, + expectedOutputAttributes, + catalogTable, + isStreaming.asInstanceOf[Object])._1.asInstanceOf[LogicalRelation] } else { throw new UnsupportedOperationException("Unsupported Spark version") } @@ -183,27 +192,23 @@ object CarbonReflectionUtils { def getAstBuilder(conf: Object, sqlParser: Object, sparkSession: SparkSession): AstBuilder = { - if (SPARK_VERSION.startsWith("2.1") || SPARK_VERSION.startsWith("2.2")) { - val className = sparkSession.sparkContext.conf.get( - CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME, - "org.apache.spark.sql.hive.CarbonSqlAstBuilder") - createObject(className, - conf, - sqlParser, sparkSession)._1.asInstanceOf[AstBuilder] - } else { - throw new UnsupportedOperationException("Spark version not supported") - } + val className = sparkSession.sparkContext.conf.get( + CarbonCommonConstants.CARBON_SQLASTBUILDER_CLASSNAME, + "org.apache.spark.sql.hive.CarbonSqlAstBuilder") + createObject(className, + conf, + sqlParser, sparkSession)._1.asInstanceOf[AstBuilder] } def getSessionState(sparkContext: SparkContext, carbonSession: Object, useHiveMetaStore: Boolean): Any = { - if (SPARK_VERSION.startsWith("2.1")) { + if (SparkUtil.isSparkVersionEqualTo("2.1")) { val className = sparkContext.conf.get( CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME, "org.apache.spark.sql.hive.CarbonSessionState") createObject(className, carbonSession)._1 - } else if (SPARK_VERSION.startsWith("2.2")) { + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { if (useHiveMetaStore) { val className = sparkContext.conf.get( CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME, @@ -225,12 +230,12 @@ object CarbonReflectionUtils { } def hasPredicateSubquery(filterExp: Expression) : Boolean = { - if (SPARK_VERSION.startsWith("2.1")) { + if (SparkUtil.isSparkVersionEqualTo("2.1")) { val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.PredicateSubquery") val method = tuple.getMethod("hasPredicateSubquery", classOf[Expression]) val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean] hasSubquery - } else if (SPARK_VERSION.startsWith("2.2")) { + } else if (SparkUtil.isSparkVersionXandAbove("2.2")) { val tuple = Class.forName("org.apache.spark.sql.catalyst.expressions.SubqueryExpression") val method = tuple.getMethod("hasInOrExistsSubquery", classOf[Expression]) val hasSubquery : Boolean = method.invoke(tuple, filterExp).asInstanceOf[Boolean] @@ -248,6 +253,54 @@ object CarbonReflectionUtils { isFormatted } + def getRowDataSourceScanExecObj(relation: LogicalRelation, + output: Seq[Attribute], + pushedFilters: Seq[Filter], + handledFilters: Seq[Filter], + rdd: RDD[InternalRow], + partition: Partitioning, + metadata: Map[String, String]): RowDataSourceScanExec = { + val className = "org.apache.spark.sql.execution.RowDataSourceScanExec" + if (SparkUtil.isSparkVersionEqualTo("2.1") || SparkUtil.isSparkVersionEqualTo("2.2")) { + createObject(className, output, rdd, relation.relation, + partition, metadata, + relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec] + } else if (SparkUtil.isSparkVersionXandAbove("2.3")) { + createObject(className, output, output.map(output.indexOf), + pushedFilters.toSet, handledFilters.toSet, rdd, + relation.relation, + relation.catalogTable.map(_.identifier))._1.asInstanceOf[RowDataSourceScanExec] + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } + + def invokewriteAndReadMethod(dataSourceObj: DataSource, + dataFrame: DataFrame, + data: LogicalPlan, + session: SparkSession, + mode: SaveMode, + query: LogicalPlan, + physicalPlan: SparkPlan): BaseRelation = { + if (SparkUtil.isSparkVersionEqualTo("2.2")) { + val method: Method = dataSourceObj.getClass + .getMethod("writeAndRead", classOf[SaveMode], classOf[DataFrame]) + method.invoke(dataSourceObj, mode, dataFrame) + .asInstanceOf[BaseRelation] + } else if (SparkUtil.isSparkVersionEqualTo("2.3")) { + val method: Method = dataSourceObj.getClass + .getMethod("writeAndRead", + classOf[SaveMode], + classOf[LogicalPlan], + classOf[Seq[Attribute]], + classOf[SparkPlan]) + method.invoke(dataSourceObj, mode, query, query.output, physicalPlan) + .asInstanceOf[BaseRelation] + } else { + throw new UnsupportedOperationException("Spark version not supported") + } + } + def createObject(className: String, conArgs: Object*): (Any, Class[_]) = { val clazz = Utils.classForName(className) val ctor = clazz.getConstructors.head @@ -255,4 +308,10 @@ object CarbonReflectionUtils { (ctor.newInstance(conArgs: _*), clazz) } + def createObjectOfPrivateConstructor(className: String, conArgs: Object*): (Any, Class[_]) = { + val clazz = Utils.classForName(className) + val ctor = clazz.getDeclaredConstructors.head + ctor.setAccessible(true) + (ctor.newInstance(conArgs: _*), clazz) + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala deleted file mode 100644 index 4635fc7..0000000 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/SparkUtil.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.io.{LongWritable, Text} -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.spark.{SparkContext, TaskContext} -import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.rdd.{NewHadoopPartition, NewHadoopRDD} - -import org.apache.carbondata.processing.loading.csvinput.BlockDetails - -/* - * this object use to handle file splits - */ -object SparkUtil { - - def setTaskContext(context: TaskContext): Unit = { - val localThreadContext = TaskContext.get() - if (localThreadContext == null) { - TaskContext.setTaskContext(context) - } - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala b/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala new file mode 100644 index 0000000..4810db1 --- /dev/null +++ b/integration/spark-common/src/test/scala/org/apache/spark/util/SparkUtilTest.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.SPARK_VERSION +import org.scalatest.FunSuite + +class SparkUtilTest extends FunSuite{ + + test("Test Spark Version API with X and Above") { + if (SPARK_VERSION.startsWith("2.1")) { + assert(SparkUtil.isSparkVersionXandAbove("2.1")) + assert(!SparkUtil.isSparkVersionXandAbove("2.2")) + assert(!SparkUtil.isSparkVersionXandAbove("2.3")) + } else if (SPARK_VERSION.startsWith("2.2")) { + assert(SparkUtil.isSparkVersionXandAbove("2.1")) + assert(SparkUtil.isSparkVersionXandAbove("2.2")) + assert(!SparkUtil.isSparkVersionXandAbove("2.3")) + } else { + assert(SparkUtil.isSparkVersionXandAbove("2.1")) + assert(SparkUtil.isSparkVersionXandAbove("2.2")) + assert(SparkUtil.isSparkVersionXandAbove("2.3")) + assert(!SparkUtil.isSparkVersionXandAbove("2.4")) + } + } + + test("Test Spark Version API Equal to X") { + if (SPARK_VERSION.startsWith("2.1")) { + assert(SparkUtil.isSparkVersionEqualTo("2.1")) + assert(!SparkUtil.isSparkVersionEqualTo("2.2")) + assert(!SparkUtil.isSparkVersionEqualTo("2.3")) + } else if (SPARK_VERSION.startsWith("2.2")) { + assert(!SparkUtil.isSparkVersionEqualTo("2.1")) + assert(SparkUtil.isSparkVersionEqualTo("2.2")) + assert(!SparkUtil.isSparkVersionEqualTo("2.3")) + } else { + assert(!SparkUtil.isSparkVersionEqualTo("2.1")) + assert(!SparkUtil.isSparkVersionEqualTo("2.2")) + assert(SparkUtil.isSparkVersionEqualTo("2.3")) + assert(!SparkUtil.isSparkVersionEqualTo("2.4")) + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml index 38cf629..3d017c8 100644 --- a/integration/spark-datasource/pom.xml +++ b/integration/spark-datasource/pom.xml @@ -39,6 +39,15 @@ <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-hadoop</artifactId> <version>${project.version}</version> + <exclusions> + <!-- need to Exclude net.jpountz jar from this project. + Spark has changed this jar to org.lz4:lz4-java + net.jpountz and org.lz4 has same class Name --> + <exclusion> + <groupId>net.jpountz.lz4</groupId> + <artifactId>lz4</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> @@ -192,5 +201,128 @@ <maven.test.skip>true</maven.test.skip> </properties> </profile> + <profile> + <id>spark-2.1</id> + <properties> + <spark.version>2.1.0</spark.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.8</scala.version> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/main/spark2.3plus</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/spark2.1andspark2.2</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>spark-2.2</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <spark.version>2.2.1</spark.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.8</scala.version> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/main/spark2.3plus</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/spark2.1andspark2.2</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + <profile> + <id>spark-2.3</id> + <properties> + <spark.version>2.3.1</spark.version> + <scala.binary.version>2.11</scala.binary.version> + <scala.version>2.11.8</scala.version> + </properties> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/main/spark2.1andspark2.2</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>src/main/spark2.3plus</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> </profiles> </project> http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java index 7bab117..bc5a387 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java +++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.Decimal; class ColumnarVectorWrapper implements CarbonColumnVector { - private CarbonVectorProxy writableColumnVector; + private CarbonVectorProxy sparkColumnVectorProxy; private boolean[] filteredRows; @@ -38,32 +38,36 @@ class ColumnarVectorWrapper implements CarbonColumnVector { private int ordinal; + private boolean isDictionary; + private boolean filteredRowsExist; private DataType blockDataType; + private CarbonColumnVector dictionaryVector; + ColumnarVectorWrapper(CarbonVectorProxy writableColumnVector, - boolean[] filteredRows, int ordinal) { - this.writableColumnVector = writableColumnVector; + boolean[] filteredRows, int ordinal) { + this.sparkColumnVectorProxy = writableColumnVector; this.filteredRows = filteredRows; this.ordinal = ordinal; } @Override public void putBoolean(int rowId, boolean value) { if (!filteredRows[rowId]) { - writableColumnVector.putBoolean(counter++, value, ordinal); + sparkColumnVectorProxy.putBoolean(counter++, value, ordinal); } } @Override public void putFloat(int rowId, float value) { if (!filteredRows[rowId]) { - writableColumnVector.putFloat(counter++, value,ordinal); + sparkColumnVectorProxy.putFloat(counter++, value,ordinal); } } @Override public void putShort(int rowId, short value) { if (!filteredRows[rowId]) { - writableColumnVector.putShort(counter++, value, ordinal); + sparkColumnVectorProxy.putShort(counter++, value, ordinal); } } @@ -71,18 +75,22 @@ class ColumnarVectorWrapper implements CarbonColumnVector { if (filteredRowsExist) { for (int i = 0; i < count; i++) { if (!filteredRows[rowId]) { - writableColumnVector.putShort(counter++, value, ordinal); + sparkColumnVectorProxy.putShort(counter++, value, ordinal); } rowId++; } } else { - writableColumnVector.putShorts(rowId, count, value, ordinal); + sparkColumnVectorProxy.putShorts(rowId, count, value, ordinal); } } @Override public void putInt(int rowId, int value) { if (!filteredRows[rowId]) { - writableColumnVector.putInt(counter++, value, ordinal); + if (isDictionary) { + sparkColumnVectorProxy.putDictionaryInt(counter++, value, ordinal); + } else { + sparkColumnVectorProxy.putInt(counter++, value, ordinal); + } } } @@ -90,18 +98,18 @@ class ColumnarVectorWrapper implements CarbonColumnVector { if (filteredRowsExist) { for (int i = 0; i < count; i++) { if (!filteredRows[rowId]) { - writableColumnVector.putInt(counter++, value, ordinal); + sparkColumnVectorProxy.putInt(counter++, value, ordinal); } rowId++; } } else { - writableColumnVector.putInts(rowId, count, value, ordinal); + sparkColumnVectorProxy.putInts(rowId, count, value, ordinal); } } @Override public void putLong(int rowId, long value) { if (!filteredRows[rowId]) { - writableColumnVector.putLong(counter++, value, ordinal); + sparkColumnVectorProxy.putLong(counter++, value, ordinal); } } @@ -109,19 +117,19 @@ class ColumnarVectorWrapper implements CarbonColumnVector { if (filteredRowsExist) { for (int i = 0; i < count; i++) { if (!filteredRows[rowId]) { - writableColumnVector.putLong(counter++, value, ordinal); + sparkColumnVectorProxy.putLong(counter++, value, ordinal); } rowId++; } } else { - writableColumnVector.putLongs(rowId, count, value, ordinal); + sparkColumnVectorProxy.putLongs(rowId, count, value, ordinal); } } @Override public void putDecimal(int rowId, BigDecimal value, int precision) { if (!filteredRows[rowId]) { Decimal toDecimal = Decimal.apply(value); - writableColumnVector.putDecimal(counter++, toDecimal, precision, ordinal); + sparkColumnVectorProxy.putDecimal(counter++, toDecimal, precision, ordinal); } } @@ -129,7 +137,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector { Decimal decimal = Decimal.apply(value); for (int i = 0; i < count; i++) { if (!filteredRows[rowId]) { - writableColumnVector.putDecimal(counter++, decimal, precision, ordinal); + sparkColumnVectorProxy.putDecimal(counter++, decimal, precision, ordinal); } rowId++; } @@ -137,7 +145,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector { @Override public void putDouble(int rowId, double value) { if (!filteredRows[rowId]) { - writableColumnVector.putDouble(counter++, value, ordinal); + sparkColumnVectorProxy.putDouble(counter++, value, ordinal); } } @@ -145,25 +153,25 @@ class ColumnarVectorWrapper implements CarbonColumnVector { if (filteredRowsExist) { for (int i = 0; i < count; i++) { if (!filteredRows[rowId]) { - writableColumnVector.putDouble(counter++, value, ordinal); + sparkColumnVectorProxy.putDouble(counter++, value, ordinal); } rowId++; } } else { - writableColumnVector.putDoubles(rowId, count, value, ordinal); + sparkColumnVectorProxy.putDoubles(rowId, count, value, ordinal); } } @Override public void putBytes(int rowId, byte[] value) { if (!filteredRows[rowId]) { - writableColumnVector.putByteArray(counter++, value, ordinal); + sparkColumnVectorProxy.putByteArray(counter++, value, ordinal); } } @Override public void putBytes(int rowId, int count, byte[] value) { for (int i = 0; i < count; i++) { if (!filteredRows[rowId]) { - writableColumnVector.putByteArray(counter++, value, ordinal); + sparkColumnVectorProxy.putByteArray(counter++, value, ordinal); } rowId++; } @@ -171,13 +179,13 @@ class ColumnarVectorWrapper implements CarbonColumnVector { @Override public void putBytes(int rowId, int offset, int length, byte[] value) { if (!filteredRows[rowId]) { - writableColumnVector.putByteArray(counter++, value, offset, length, ordinal); + sparkColumnVectorProxy.putByteArray(counter++, value, offset, length, ordinal); } } @Override public void putNull(int rowId) { if (!filteredRows[rowId]) { - writableColumnVector.putNull(counter++, ordinal); + sparkColumnVectorProxy.putNull(counter++, ordinal); } } @@ -185,18 +193,18 @@ class ColumnarVectorWrapper implements CarbonColumnVector { if (filteredRowsExist) { for (int i = 0; i < count; i++) { if (!filteredRows[rowId]) { - writableColumnVector.putNull(counter++, ordinal); + sparkColumnVectorProxy.putNull(counter++, ordinal); } rowId++; } } else { - writableColumnVector.putNulls(rowId, count,ordinal); + sparkColumnVectorProxy.putNulls(rowId, count,ordinal); } } @Override public void putNotNull(int rowId) { if (!filteredRows[rowId]) { - columnVector.putNotNull(counter++); + sparkColumnVectorProxy.putNotNull(counter++,ordinal); } } @@ -204,17 +212,17 @@ class ColumnarVectorWrapper implements CarbonColumnVector { if (filteredRowsExist) { for (int i = 0; i < count; i++) { if (!filteredRows[rowId]) { - columnVector.putNotNull(counter++); + sparkColumnVectorProxy.putNotNull(counter++, ordinal); } rowId++; } } else { - columnVector.putNotNulls(rowId, count); + sparkColumnVectorProxy.putNotNulls(rowId, count, ordinal); } } @Override public boolean isNull(int rowId) { - return writableColumnVector.isNullAt(rowId,ordinal); + return sparkColumnVectorProxy.isNullAt(rowId,ordinal); } @Override public void putObject(int rowId, Object obj) { @@ -236,7 +244,7 @@ class ColumnarVectorWrapper implements CarbonColumnVector { @Override public DataType getType() { return CarbonSparkDataSourceUtil - .convertSparkToCarbonDataType(writableColumnVector.dataType(ordinal)); + .convertSparkToCarbonDataType(sparkColumnVectorProxy.dataType(ordinal)); } @Override @@ -249,20 +257,32 @@ class ColumnarVectorWrapper implements CarbonColumnVector { this.blockDataType = blockDataType; } - @Override public void setFilteredRowsExist(boolean filteredRowsExist) { + @Override + public void setFilteredRowsExist(boolean filteredRowsExist) { this.filteredRowsExist = filteredRowsExist; } @Override public void setDictionary(CarbonDictionary dictionary) { if (dictionary == null) { - columnVector.setDictionary(null); + sparkColumnVectorProxy.setDictionary(null, ordinal); } else { - columnVector.setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary)); + sparkColumnVectorProxy + .setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary),ordinal); } } + private void setDictionaryType(boolean type) { + this.isDictionary = type; + } + @Override public boolean hasDictionary() { - return columnVector.hasDictionary(); + return sparkColumnVectorProxy.hasDictionary(ordinal); + } + + public void reserveDictionaryIds() { + sparkColumnVectorProxy.reserveDictionaryIds(sparkColumnVectorProxy.numRows(), ordinal); + dictionaryVector = new ColumnarVectorWrapper(sparkColumnVectorProxy, filteredRows, ordinal); + ((ColumnarVectorWrapper) dictionaryVector).isDictionary = true; } @Override public CarbonColumnVector getDictionaryVector() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/74c3eb10/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java index 7c98608..d6b6b7e 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java +++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java @@ -51,16 +51,13 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.spark.memory.MemoryMode; -<<<<<<< 2f537b724f6f03ab40c95f7ecc8ebd38f6500099:integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; -import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -======= import org.apache.spark.sql.CarbonVectorProxy; ->>>>>>> [CARBONDATA-2532][Integration] Carbon to support spark 2.3 version, ColumnVector Interface:integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; /** * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the @@ -282,22 +279,23 @@ public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> { schema = schema.add(field); } } - columnarBatch = ColumnarBatch.allocate(schema, memMode); + vectorProxy = new CarbonVectorProxy(DEFAULT_MEMORY_MODE,schema,DEFAULT_BATCH_SIZE); if (partitionColumns != null) { int partitionIdx = fields.length; for (int i = 0; i < partitionColumns.fields().length; i++) { - ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), partitionValues, i); - columnarBatch.column(i + partitionIdx).setIsConstant(); + ColumnVectorUtils.populate(vectorProxy.column(i + partitionIdx), partitionValues, i); + vectorProxy.column(i + partitionIdx).setIsConstant(); } } - vectorProxy = new CarbonVectorProxy(MemoryMode.OFF_HEAP,DEFAULT_BATCH_SIZE,fields); CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length]; boolean[] filteredRows = new boolean[vectorProxy.numRows()]; for (int i = 0; i < fields.length; i++) { - if (isNoDictStringField[i]) { - vectorProxy.reserveDictionaryIds(vectorProxy.numRows(), i); - } vectors[i] = new ColumnarVectorWrapper(vectorProxy, filteredRows, i); + if (isNoDictStringField[i]) { + if (vectors[i] instanceof ColumnarVectorWrapper) { + ((ColumnarVectorWrapper) vectors[i]).reserveDictionaryIds(); + } + } } carbonColumnarBatch = new CarbonColumnarBatch(vectors, vectorProxy.numRows(), filteredRows); }