This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 4d7c8ad [CARBONDATA-3309] MV datamap supports Spark 2.1 4d7c8ad is described below commit 4d7c8ada98ed15511d0abff349b64522f047344b Author: qiuchenjian <807169...@qq.com> AuthorDate: Sun Mar 17 20:04:48 2019 +0800 [CARBONDATA-3309] MV datamap supports Spark 2.1 [Problem] MV datamap doesn't support Spark 2.1 version, so we need to support it [Solution] The following is the modification point and all MV test cases are passed on spark 2.1 version The Class we cann’t access in Spark 2.1 version (1). org.apache.spark.internal.Logging (2). org.apache.spark.sql.internal.SQLConf Solution:Create class extends above classed The Class that Spark 2.1 version doesn’t have (1). org.apache.spark.sql.catalyst.plans.logical.Subquery (2). org.apache.spark.sql.catalyst.catalog.interface.HiveTableRelation Solution: Use CatalogRelation instead and don’t use (in LogicalPlanSignatureGenerator) Mv the Subquery code to carbon project The method that we can’t access in Spark 2.1 version (1). sparkSession.sessionState.catalog.lookupRelation Solution: Solution:Add this method of SparkSQLUtil The changes of some class (1). org.apache.spark.sql.catalyst.expressions.SortOrder (2). org.apache.spark.sql.catalyst.expressions.Cast (3). org.apache.spark.sql.catalyst.plans.Statistics Solution: Adapt the new interface The method that Spark 2.1 version doesn’t have (1). normalizeExprId,canonicalized of org.apache.spark.sql.catalyst.plans.QueryPlan (2). CASE_SENSITIVE of SQLConf (3). STARSCHEMA_DETECTION of SQLConf Solution:Don’t use normalize , canonicalize and the CASE_SENSITIVE, STARSCHEMA_DETECTION Some logicplan optimization rules that Spark 2.1 version doesn’t have (1). SimplifyCreateMapOps (2). SimplifyCreateArrayOps (3). SimplifyCreateStructOps (4). RemoveRedundantProject (5). RemoveRedundantAliases (6). PullupCorrelatedPredicates (7). ReplaceDeduplicateWithAggregate (8). EliminateView Solution: Delete or move the code to carbon project Generate the instance in SparkSQLUtil to adapt Spark 2.1 version Query SQL pass the MV check in Spark 2.1 version(CarbonSessionState) This closes #3150 --- .../carbondata/mv/datamap/MVDataMapProvider.scala | 2 +- .../apache/carbondata/mv/datamap/MVHelper.scala | 2 +- .../apache/carbondata/mv/rewrite/MatchMaker.scala | 2 +- .../mv/rewrite/SummaryDatasetCatalog.scala | 5 +- .../carbondata/mv/rewrite/TestSQLSuite.scala | 4 +- .../carbondata/mv/rewrite/Tpcds_1_4_Suite.scala | 4 +- .../mv/expressions/modular/subquery.scala | 13 ++- .../mv/plans/modular/AggregatePushDown.scala | 8 +- .../carbondata/mv/plans/modular/Harmonizer.scala | 2 +- .../carbondata/mv/plans/modular/ModularPlan.scala | 8 +- .../mv/plans/modular/ModularRelation.scala | 22 +--- .../carbondata/mv/plans/modular/Modularizer.scala | 2 +- .../mv/plans/util/BirdcageOptimizer.scala | 10 +- .../mv/plans/util/Logical2ModularExtractions.scala | 19 +-- .../carbondata/mv/plans/util/SQLBuildDSL.scala | 5 +- .../carbondata/mv/plans/util/SQLBuilder.scala | 9 -- .../carbondata/mv/plans/util/Signature.scala | 2 +- .../carbondata/mv/testutil/Tpcds_1_4_Tables.scala | 4 +- .../carbondata/mv/plans/ModularToSQLSuite.scala | 4 +- .../carbondata/mv/plans/SignatureSuite.scala | 4 +- .../spark/sql/catalyst/analysis/EmptyRule.scala | 26 +++++ .../org/apache/spark/sql/util/SparkSQLUtil.scala | 113 +++++++++++++++++- .../apache/spark/util/CarbonReflectionUtils.scala | 7 ++ .../src/main/scala/org/apache/spark/Logging.scala | 22 ++++ .../main/scala/org/apache/spark/sql/SQLConf.scala | 23 ++++ .../apache/spark/sql/CarbonToSparkAdapater.scala | 8 +- .../sql/catalyst/catalog/HiveTableRelation.scala | 56 +++++++++ .../sql/catalyst/optimizer/MigrateOptimizer.scala | 129 +++++++++++++++++++++ .../sql/catalyst/plans/logical/Subquery.scala | 28 +++++ .../apache/spark/sql/hive/CarbonSessionState.scala | 19 ++- 30 files changed, 481 insertions(+), 81 deletions(-) diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala index 7108bf8..5ffc46a 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala @@ -81,7 +81,7 @@ class MVDataMapProvider( val identifier = dataMapSchema.getRelationIdentifier val logicalPlan = new FindDataSourceTable(sparkSession).apply( - sparkSession.sessionState.catalog.lookupRelation( + SparkSQLUtil.sessionState(sparkSession).catalog.lookupRelation( TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName)))) match { case s: SubqueryAlias => s.child diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala index 8baa924..9bed098 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala @@ -547,7 +547,7 @@ object MVHelper { relation, aliasMap, keepAlias = false) - SortOrder(expressions.head, s.direction, s.sameOrderExpressions) + SortOrder(expressions.head, s.direction) } // In case of limit it goes to other. case other => other diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala index 2c5d8f4..493539b 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala @@ -17,7 +17,7 @@ package org.apache.carbondata.mv.rewrite -import org.apache.spark.internal.Logging +import org.apache.spark.Logging import org.apache.spark.sql.catalyst.trees.TreeNode abstract class MatchPattern[MatchingPlan <: TreeNode[MatchingPlan]] extends Logging { diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala index 026d6b7..ea0bfea 100644 --- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala +++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.FindDataSourceTable import org.apache.spark.sql.parser.CarbonSpark2SqlParser +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.core.datamap.DataMapCatalog import org.apache.carbondata.core.datamap.status.DataMapStatusManager @@ -34,6 +35,7 @@ import org.apache.carbondata.mv.plans.modular.{Flags, ModularPlan, ModularRelati import org.apache.carbondata.mv.plans.util.Signature import org.apache.carbondata.mv.session.MVSession + /** Holds a summary logical plan */ private[mv] case class SummaryDataset(signature: Option[Signature], plan: LogicalPlan, @@ -114,7 +116,8 @@ private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession) mvSession.sessionState.optimizer.execute(planToRegister)).next().semiHarmonized val signature = modularPlan.signature val identifier = dataMapSchema.getRelationIdentifier - val output = new FindDataSourceTable(sparkSession).apply(sparkSession.sessionState.catalog + val output = new FindDataSourceTable(sparkSession) + .apply(SparkSQLUtil.sessionState(sparkSession).catalog .lookupRelation(TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName)))) .output val relation = ModularRelation(identifier.getDatabaseName, diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala index 25f07e4..95450b2 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala @@ -20,15 +20,15 @@ package org.apache.carbondata.mv.rewrite import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive.CarbonSessionCatalog import org.scalatest.BeforeAndAfter - import org.apache.carbondata.mv.testutil.ModularPlanTest +import org.apache.spark.sql.util.SparkSQLUtil class TestSQLSuite extends ModularPlanTest with BeforeAndAfter { import org.apache.carbondata.mv.rewrite.matching.TestSQLBatch._ val spark = sqlContext val testHive = sqlContext.sparkSession - val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient() + val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient() ignore("protypical mqo rewrite test") { diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala index 882a43a..b30a131 100644 --- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala +++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/Tpcds_1_4_Suite.scala @@ -20,8 +20,8 @@ package org.apache.carbondata.mv.rewrite import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive.CarbonSessionCatalog import org.scalatest.BeforeAndAfter - import org.apache.carbondata.mv.testutil.ModularPlanTest +import org.apache.spark.sql.util.SparkSQLUtil //import org.apache.spark.sql.catalyst.SQLBuilder import java.io.{File, PrintWriter} @@ -31,7 +31,7 @@ class Tpcds_1_4_Suite extends ModularPlanTest with BeforeAndAfter { val spark = sqlContext val testHive = sqlContext.sparkSession - val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient() + val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient() test("test using tpc-ds queries") { diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala index cfe341a..e41c9ca 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala @@ -17,9 +17,9 @@ package org.apache.carbondata.mv.expressions.modular -import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, AttributeSet, Expression, ExprId, LeafExpression, NamedExpression, OuterReference, PlanExpression, Predicate, Unevaluable} -import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.mv.plans.modular.ModularPlan @@ -53,7 +53,8 @@ abstract class ModularSubquery( def canonicalize(attrs: AttributeSeq): ModularSubquery = { // Normalize the outer references in the subquery plan. val normalizedPlan = plan.transformAllExpressions { - case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs)) + case OuterReference(r) => OuterReference(SparkSQLUtil. + invokeQueryPlannormalizeExprId(r, attrs)) } withNewPlan(normalizedPlan).canonicalized.asInstanceOf[ModularSubquery] } @@ -80,7 +81,7 @@ case class ScalarModularSubquery( override lazy val canonicalized: Expression = { ScalarModularSubquery( - plan.canonicalized, + plan.canonicalizedDef, children.map(_.canonicalized), ExprId(0)) } @@ -122,7 +123,7 @@ case class ModularListQuery( override lazy val canonicalized: Expression = { ModularListQuery( - plan.canonicalized, + plan.canonicalizedDef, children.map(_.canonicalized), ExprId(0)) } @@ -153,7 +154,7 @@ case class ModularExists( override lazy val canonicalized: Expression = { ModularExists( - plan.canonicalized, + plan.canonicalizedDef, children.map(_.canonicalized), ExprId(0)) } diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala index 77efaf7..a19066d 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala @@ -19,7 +19,8 @@ package org.apache.carbondata.mv.plans.modular import scala.collection._ -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Cast, Divide, ExprId, Literal, NamedExpression} +import org.apache.spark.sql.CarbonExpressions.MatchCast +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Divide, ExprId, Literal, NamedExpression} import org.apache.spark.sql.catalyst.expressions.aggregate._ trait AggregatePushDown { // self: ModularPlan => @@ -106,12 +107,11 @@ trait AggregatePushDown { // self: ModularPlan => } else { Map.empty[Int, (NamedExpression, Seq[NamedExpression])] } - case sum@AggregateExpression(Sum(Cast(expr, dataType, timeZoneId)), _, false, _) - if expr.isInstanceOf[Attribute] => + case sum@AggregateExpression(Sum(cast@MatchCast(expr, dataType)), _, false, _) => val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr) .asInstanceOf[Attribute] if (fact.outputSet.contains(tAttr)) { - val sum1 = AggregateExpression(Sum(Cast(tAttr, dataType, timeZoneId)), sum.mode, false) + val sum1 = AggregateExpression(Sum(cast), sum.mode, false) val alias = Alias(sum1, sum1.toString)() val tSum = AggregateExpression(Sum(alias.toAttribute), sum.mode, false, sum.resultId) val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId)) diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala index ebe8c8c..cb2043e 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala @@ -20,7 +20,7 @@ package org.apache.carbondata.mv.plans.modular import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.SQLConf import org.apache.carbondata.mv.plans import org.apache.carbondata.mv.plans._ diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala index 6c82598..cdf0aa4 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala @@ -20,13 +20,13 @@ package org.apache.carbondata.mv.plans.modular import scala.collection._ import scala.collection.mutable.{HashMap, MultiMap} -import org.apache.spark.internal.Logging +import org.apache.spark.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper} import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan} import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.trees.TreeNode -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.SQLConf import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.mv.plans._ @@ -45,6 +45,10 @@ abstract class ModularPlan lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved + def canonicalizedDef: ModularPlan = { + canonicalized + } + def childrenResolved: Boolean = children.forall(_.resolved) private var statsCache: Option[Statistics] = None diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala index 491d394..b7512d2 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.Statistics -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.SQLConf import org.apache.spark.sql.util.SparkSQLUtil import org.apache.carbondata.mv.plans.modular.Flags._ @@ -47,14 +47,7 @@ case class ModularRelation(databaseName: String, override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = { val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan val stats = SparkSQLUtil.invokeStatsMethod(plan, conf) - val output = outputList.map(_.toAttribute) - val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { - table => AttributeMap(table.output.zip(output)) - } - val rewrites = mapSeq(0) - val attributeStats = AttributeMap(stats.attributeStats.iterator - .map { pair => (rewrites(pair._1), pair._2) }.toSeq) - Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints) + SparkSQLUtil.getStatisticsObj(outputList, plan, stats) } override def output: Seq[Attribute] = outputList.map(_.toAttribute) @@ -155,10 +148,6 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode { val stats = SparkSQLUtil.invokeStatsMethod(plan, conf) val output = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation] .outputList.map(_.toAttribute) - val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { - table => AttributeMap(table.output.zip(output)) - } - val rewrites = mapSeq.head val aliasMap = AttributeMap( source.asInstanceOf[GroupBy].outputList.collect { case a@Alias(ar: Attribute, _) => (ar, a.toAttribute) @@ -167,12 +156,7 @@ case class HarmonizedRelation(source: ModularPlan) extends LeafNode { case a@Alias(AggregateExpression(Last(ar: Attribute, _), _, _, _), _) => (ar, a.toAttribute) }) - val aStatsIterator = stats.attributeStats.iterator.map { pair => (rewrites(pair._1), pair._2) } - val attributeStats = - AttributeMap( - aStatsIterator.map(pair => (aliasMap.get(pair._1).getOrElse(pair._1), pair._2)).toSeq) - - Statistics(stats.sizeInBytes, None, attributeStats, stats.hints) + SparkSQLUtil.getStatisticsObj(output, plan, stats, Option(aliasMap)) } override def output: Seq[Attribute] = source.output diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala index d255359..a0d16cb 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala @@ -17,7 +17,7 @@ package org.apache.carbondata.mv.plans.modular -import org.apache.spark.internal.Logging +import org.apache.spark.Logging import org.apache.spark.sql.catalyst.expressions.{Exists, ListQuery, ScalarSubquery} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.trees.TreeNode diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala index 42cf15c..9182a89 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala @@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _} import org.apache.spark.sql.catalyst.rules.{RuleExecutor, _} -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.SQLConf import org.apache.spark.sql.util.SparkSQLUtil object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { val conf = new SQLConf() - .copy(SQLConf.CASE_SENSITIVE -> true, SQLConf.STARSCHEMA_DETECTION -> true) + protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations) def batches: Seq[Batch] = { @@ -40,7 +40,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { Batch( "Finish Analysis", Once, EliminateSubqueryAliases, - EliminateView, + SparkSQLUtil.getEliminateViewObj(), ReplaceExpressions, ComputeCurrentTime, // GetCurrentDatabase(sessionCatalog), @@ -59,7 +59,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { CombineUnions) :: Batch( "Pullup Correlated Expressions", Once, - PullupCorrelatedPredicates) :: + SparkSQLUtil.getPullupCorrelatedPredicatesObj()) :: Batch( "Subquery", Once, OptimizeSubqueries) :: @@ -107,7 +107,7 @@ object BirdcageOptimizer extends RuleExecutor[LogicalPlan] { SimplifyCaseConversionExpressions, RewriteCorrelatedScalarSubquery, EliminateSerialization, - RemoveRedundantAliases, + SparkSQLUtil.getRemoveRedundantAliasesObj(), RemoveRedundantProject, SimplifyCreateStructOps, SimplifyCreateArrayOps, diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala index de65e37..0652575 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala @@ -17,9 +17,8 @@ package org.apache.carbondata.mv.plans.util -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, - AttributeSet, Expression, NamedExpression, PredicateHelper} +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression, PredicateHelper} import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -27,6 +26,8 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.carbondata.mv.plans.modular.Flags._ import org.apache.carbondata.mv.plans.modular.JoinEdge + + /** * SelectModule is extracted from logical plan of SPJG query. All join conditions * filter, and project operators in a single Aggregate-less subtree of logical plan @@ -335,13 +336,13 @@ object ExtractTableModule extends PredicateHelper { def unapply(plan: LogicalPlan): Option[ReturnType] = { plan match { // uncomment for cloudera1 version -// case m: CatalogRelation => -// Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags, -// Seq.empty) -// uncomment for apache version + // case m: CatalogRelation => + // Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags, + // Seq.empty) + // uncomment for apache version case m: HiveTableRelation => - Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags, - Seq.empty) + Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags, + Seq.empty) case l: LogicalRelation => val tableIdentifier = l.catalogTable.map(_.identifier) val database = tableIdentifier.map(_.database).flatten.getOrElse(null) diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala index 307fff0..d2e4375 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala @@ -17,6 +17,7 @@ package org.apache.carbondata.mv.plans.util +import org.apache.spark.sql.CarbonExpressions.MatchCastExpression import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, BitwiseAnd, Cast, Expression, Grouping, GroupingID, Literal, NamedExpression, ShiftRight} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.types.{ByteType, IntegerType} @@ -398,10 +399,10 @@ trait SQLBuildDSL { // it back. case ar: AttributeReference if ar == gid => GroupingID(Nil) case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar) - case a@Cast( + case a@MatchCastExpression( BitwiseAnd( ShiftRight(ar: AttributeReference, Literal(value: Any, IntegerType)), - Literal(1, IntegerType)), ByteType, None) if ar == gid => + Literal(1, IntegerType)), ByteType) if ar == gid => // for converting an expression to its original SQL format grouping(col) val idx = groupByExprs.length - 1 - value.asInstanceOf[Int] groupByExprs.lift(idx).map(Grouping).getOrElse(a) diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala index b6e62eb..815fb58 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala @@ -220,15 +220,6 @@ class SQLBuilder private( } } } - - object RemoveCasts extends Rule[ModularPlan] { - def apply(tree: ModularPlan): ModularPlan = { - tree transformAllExpressions { - case Cast(e, dataType, _) => e - } - } - } - } object SQLBuilder { diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala index c46124b..1f8d7b4 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala @@ -17,7 +17,7 @@ package org.apache.carbondata.mv.plans.util -import org.apache.spark.internal.Logging +import org.apache.spark.Logging import org.apache.spark.sql.catalyst.trees.TreeNode case class Signature(groupby: Boolean = true, datasets: Set[String] = Set.empty) diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala index 01ca448..103333d 100644 --- a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala +++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/Tpcds_1_4_Tables.scala @@ -842,14 +842,14 @@ object Tpcds_1_4_Tables { |STORED AS parquet """.stripMargin.trim, s""" - | CREATE TABLE fact_table1 (empname String, designation String, doj Timestamp, + | CREATE TABLE IF NOT EXISTS fact_table1 (empname String, designation String, doj Timestamp, | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, | utilization int,salary int) |STORED AS parquet """.stripMargin.trim, s""" - | CREATE TABLE fact_table2 (empname String, designation String, doj Timestamp, + | CREATE TABLE IF NOT EXISTS fact_table2 (empname String, designation String, doj Timestamp, | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, | utilization int,salary int) diff --git a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala index c74491c..dad8f8a 100644 --- a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala +++ b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/ModularToSQLSuite.scala @@ -19,9 +19,9 @@ package org.apache.carbondata.mv.plans import org.apache.spark.sql.hive.CarbonSessionCatalog import org.scalatest.BeforeAndAfter - import org.apache.carbondata.mv.dsl.Plans._ import org.apache.carbondata.mv.testutil.ModularPlanTest +import org.apache.spark.sql.util.SparkSQLUtil class ModularToSQLSuite extends ModularPlanTest with BeforeAndAfter { @@ -29,7 +29,7 @@ class ModularToSQLSuite extends ModularPlanTest with BeforeAndAfter { val spark = sqlContext val testHive = sqlContext.sparkSession - val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient() + val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient() ignore("convert modular plans to sqls") { diff --git a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala index 631eca2..5d4a05f 100644 --- a/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala +++ b/datamap/mv/plan/src/test/scala/org/apache/carbondata/mv/plans/SignatureSuite.scala @@ -20,17 +20,17 @@ package org.apache.carbondata.mv.plans import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive.CarbonSessionCatalog import org.scalatest.BeforeAndAfterAll - import org.apache.carbondata.mv.dsl.Plans._ import org.apache.carbondata.mv.plans.modular.ModularPlanSignatureGenerator import org.apache.carbondata.mv.testutil.ModularPlanTest +import org.apache.spark.sql.util.SparkSQLUtil class SignatureSuite extends ModularPlanTest with BeforeAndAfterAll { import org.apache.carbondata.mv.testutil.TestSQLBatch._ val spark = sqlContext val testHive = sqlContext.sparkSession - val hiveClient = spark.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getClient() + val hiveClient = SparkSQLUtil.sessionState(spark.sparkSession).catalog.asInstanceOf[CarbonSessionCatalog].getClient() ignore("test signature computing") { diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/EmptyRule.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/EmptyRule.scala new file mode 100644 index 0000000..4c77b29 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/analysis/EmptyRule.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +object EmptyRule extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan + } +} diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala index 9ffe6e1..7903ac7 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/util/SparkSQLUtil.scala @@ -23,10 +23,14 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} +import org.apache.spark.sql.catalyst.analysis.EmptyRule +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSeq, Cast, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical +import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan, Statistics} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.internal.{SessionState, SQLConf} -import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil} +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.{CarbonReflectionUtils, SerializableConfiguration, SparkUtil, Utils} object SparkSQLUtil { def sessionState(sparkSession: SparkSession): SessionState = sparkSession.sessionState @@ -51,6 +55,87 @@ object SparkSQLUtil { } } + def invokeQueryPlannormalizeExprId(r: NamedExpression, input: AttributeSeq) + : NamedExpression = { + if (SparkUtil.isSparkVersionXandAbove("2.2")) { + val clazz = Utils.classForName("org.apache.spark.sql.catalyst.plans.QueryPlan") + clazz.getDeclaredMethod("normalizeExprId", classOf[Any], classOf[AttributeSeq]). + invoke(null, r, input).asInstanceOf[NamedExpression] + } else { + r + } + } + + def getStatisticsObj(outputList: Seq[NamedExpression], + plan: LogicalPlan, stats: Statistics, + aliasMap: Option[AttributeMap[Attribute]] = None) + : Statistics = { + val className = "org.apache.spark.sql.catalyst.plans.logical.Statistics" + if (SparkUtil.isSparkVersionXandAbove("2.2")) { + val output = outputList.map(_.toAttribute) + val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { + table => AttributeMap(table.output.zip(output)) + } + val rewrites = mapSeq.head + val attributes : AttributeMap[ColumnStat] = CarbonReflectionUtils. + getField("attributeStats", stats).asInstanceOf[AttributeMap[ColumnStat]] + var attributeStats = AttributeMap(attributes.iterator + .map { pair => (rewrites(pair._1), pair._2) }.toSeq) + if (aliasMap.isDefined) { + attributeStats = AttributeMap( + attributeStats.map(pair => (aliasMap.get(pair._1), pair._2)).toSeq) + } + val hints = CarbonReflectionUtils.getField("hints", stats).asInstanceOf[Object] + CarbonReflectionUtils.createObject(className, stats.sizeInBytes, + stats.rowCount, attributeStats, hints).asInstanceOf[Statistics] + } else { + val output = outputList.map(_.name) + val mapSeq = plan.collect { case n: logical.LeafNode => n }.map { + table => table.output.map(_.name).zip(output).toMap + } + val rewrites = mapSeq.head + val colStats = CarbonReflectionUtils.getField("colStats", stats) + .asInstanceOf[Map[String, ColumnStat]] + var attributeStats = colStats.iterator + .map { pair => (rewrites(pair._1), pair._2) }.toMap + if (aliasMap.isDefined) { + val aliasMapName = aliasMap.get.map(x => (x._1.name, x._2.name)) + attributeStats = + attributeStats.map(pair => (aliasMapName.getOrElse(pair._1, pair._1) + , pair._2)) + } + CarbonReflectionUtils.createObject(className, stats.sizeInBytes, + stats.rowCount, attributeStats).asInstanceOf[Statistics] + } + } + + def getEliminateViewObj(): Rule[LogicalPlan] = { + if (SparkUtil.isSparkVersionXandAbove("2.2")) { + val className = "org.apache.spark.sql.catalyst.analysis.EliminateView" + CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]] + } else { + EmptyRule + } + } + + def getPullupCorrelatedPredicatesObj(): Rule[LogicalPlan] = { + if (SparkUtil.isSparkVersionXandAbove("2.2")) { + val className = "org.apache.spark.sql.catalyst.optimizer.PullupCorrelatedPredicates" + CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]] + } else { + EmptyRule + } + } + + def getRemoveRedundantAliasesObj(): Rule[LogicalPlan] = { + if (SparkUtil.isSparkVersionXandAbove("2.2")) { + val className = "org.apache.spark.sql.catalyst.optimizer.RemoveRedundantAliases" + CarbonReflectionUtils.createSingleObject(className).asInstanceOf[Rule[LogicalPlan]] + } else { + EmptyRule + } + } + def getReorderJoinObj(conf: SQLConf): Rule[LogicalPlan] = { if (SparkUtil.isSparkVersionEqualTo("2.2")) { val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin"; @@ -59,7 +144,12 @@ object SparkSQLUtil { val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$"; CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 .asInstanceOf[Rule[LogicalPlan]] - } else { + } else if (SparkUtil.isSparkVersionEqualTo("2.1")) { + val className = "org.apache.spark.sql.catalyst.optimizer.ReorderJoin$"; + CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 + .asInstanceOf[Rule[LogicalPlan]] + } + else { throw new UnsupportedOperationException("Spark version not supported") } } @@ -72,7 +162,12 @@ object SparkSQLUtil { val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$"; CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 .asInstanceOf[Rule[LogicalPlan]] - } else { + } else if (SparkUtil.isSparkVersionEqualTo("2.1")) { + val className = "org.apache.spark.sql.catalyst.optimizer.EliminateOuterJoin$"; + CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 + .asInstanceOf[Rule[LogicalPlan]] + } + else { throw new UnsupportedOperationException("Spark version not supported") } } @@ -85,6 +180,10 @@ object SparkSQLUtil { val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$"; CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 .asInstanceOf[Rule[LogicalPlan]] + } else if (SparkUtil.isSparkVersionEqualTo("2.1")) { + val className = "org.apache.spark.sql.catalyst.optimizer.NullPropagation$"; + CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 + .asInstanceOf[Rule[LogicalPlan]] } else { throw new UnsupportedOperationException("Spark version not supported") } @@ -98,7 +197,11 @@ object SparkSQLUtil { val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$"; CarbonReflectionUtils.createObjectOfPrivateConstructor(className)._1 .asInstanceOf[Rule[LogicalPlan]] - } else { + } else if (SparkUtil.isSparkVersionEqualTo("2.1")) { + val className = "org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts"; + CarbonReflectionUtils.createObject(className, conf)._1.asInstanceOf[Rule[LogicalPlan]] + } + else { throw new UnsupportedOperationException("Spark version not supported") } } diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index ee635e0..bdacfcd 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -320,6 +320,13 @@ object CarbonReflectionUtils { ._1.asInstanceOf[RunnableCommand] } + def createSingleObject(className: String): Any = { + val classMirror = universe.runtimeMirror(getClass.getClassLoader) + val classTest = classMirror.staticModule(className) + val methods = classMirror.reflectModule(classTest) + methods.instance + } + def createObject(className: String, conArgs: Object*): (Any, Class[_]) = { val clazz = Utils.classForName(className) val ctor = clazz.getConstructors.head diff --git a/integration/spark2/src/main/scala/org/apache/spark/Logging.scala b/integration/spark2/src/main/scala/org/apache/spark/Logging.scala new file mode 100644 index 0000000..62d6862 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/Logging.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +trait Logging extends org.apache.spark.internal.Logging{ + +} diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/SQLConf.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/SQLConf.scala new file mode 100644 index 0000000..6f60d0f --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql + +class SQLConf extends org.apache.spark.sql.internal.SQLConf { + val CASE_SENSITIVE = true + + val STARSCHEMA_DETECTION = true +} diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala index 69541eb..79a6240 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/CarbonToSparkAdapater.scala @@ -25,7 +25,9 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext -import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation +import org.apache.spark.sql.catalyst.optimizer.OptimizeCodegen +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation} +import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.types.{DataType, Metadata} @@ -81,4 +83,8 @@ object CarbonToSparkAdapter { tablePath: String): CatalogStorageFormat = { storageFormat.copy(properties = map, locationUri = Some(tablePath)) } + + def getOptimizeCodegenRule(conf :SQLConf): Seq[Rule[LogicalPlan]] = { + Seq(OptimizeCodegen(conf)) + } } diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala new file mode 100644 index 0000000..eb3e88d --- /dev/null +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/catalog/HiveTableRelation.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import com.google.common.base.Objects +import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} +import org.apache.spark.sql.internal.SQLConf + +/** + * A `LogicalPlan` that represents a hive table. + * + * TODO: remove this after we completely make hive as a data source. + */ +case class HiveTableRelation( + tableMeta: CatalogTable, + dataCols: Seq[AttributeReference], + partitionCols: Seq[AttributeReference]) extends LeafNode with MultiInstanceRelation { + assert(tableMeta.identifier.database.isDefined) + assert(tableMeta.partitionSchema.sameType(partitionCols.toStructType)) + assert(tableMeta.schema.sameType(dataCols.toStructType)) + + // The partition column should always appear after data columns. + override def output: Seq[AttributeReference] = dataCols ++ partitionCols + + def isPartitioned: Boolean = partitionCols.nonEmpty + + override def equals(relation: Any): Boolean = relation match { + case other: HiveTableRelation => tableMeta == other.tableMeta && output == other.output + case _ => false + } + + override def hashCode(): Int = { + Objects.hashCode(tableMeta.identifier, output) + } + + override def newInstance(): HiveTableRelation = copy( + dataCols = dataCols.map(_.newInstance()), + partitionCols = partitionCols.map(_.newInstance())) +} diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala new file mode 100644 index 0000000..9a88255 --- /dev/null +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/optimizer/MigrateOptimizer.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseKeyWhen, CreateArray, CreateMap, CreateNamedStructLike, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField, IntegerLiteral, Literal} +import org.apache.spark.sql.catalyst.expressions.aggregate.First +import org.apache.spark.sql.catalyst.expressions.objects.{LambdaVariable, MapObjects} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, UnaryNode} +import org.apache.spark.sql.catalyst.rules.Rule + +class MigrateOptimizer { + +} + +/** + * Replaces logical [[Deduplicate]] operator with an [[Aggregate]] operator. + */ +object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Deduplicate(keys, child, streaming) if !streaming => + val keyExprIds = keys.map(_.exprId) + val aggCols = child.output.map { attr => + if (keyExprIds.contains(attr.exprId)) { + attr + } else { + Alias(new First(attr).toAggregateExpression(), attr.name)(attr.exprId) + } + } + Aggregate(keys, aggCols, child) + } +} + +/** A logical plan for `dropDuplicates`. */ +case class Deduplicate( + keys: Seq[Attribute], + child: LogicalPlan, + streaming: Boolean) extends UnaryNode { + + override def output: Seq[Attribute] = child.output +} + +/** + * Remove projections from the query plan that do not make any modifications. + */ +object RemoveRedundantProject extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case p @ Project(_, child) if p.output == child.output => child + } +} + +/** + * push down operations into [[CreateNamedStructLike]]. + */ +object SimplifyCreateStructOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformExpressionsUp { + // push down field extraction + case GetStructField(createNamedStructLike: CreateNamedStructLike, ordinal, _) => + createNamedStructLike.valExprs(ordinal) + } + } +} + +/** + * push down operations into [[CreateArray]]. + */ +object SimplifyCreateArrayOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformExpressionsUp { + // push down field selection (array of structs) + case GetArrayStructFields(CreateArray(elems), field, ordinal, numFields, containsNull) => + // instead f selecting the field on the entire array, + // select it from each member of the array. + // pushing down the operation this way open other optimizations opportunities + // (i.e. struct(...,x,...).x) + CreateArray(elems.map(GetStructField(_, ordinal, Some(field.name)))) + // push down item selection. + case ga @ GetArrayItem(CreateArray(elems), IntegerLiteral(idx)) => + // instead of creating the array and then selecting one row, + // remove array creation altgether. + if (idx >= 0 && idx < elems.size) { + // valid index + elems(idx) + } else { + // out of bounds, mimic the runtime behavior and return null + Literal(null, ga.dataType) + } + } + } +} + +/** + * push down operations into [[CreateMap]]. + */ +object SimplifyCreateMapOps extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = { + plan.transformExpressionsUp { + case GetMapValue(CreateMap(elems), key) => CaseKeyWhen(key, elems) + } + } +} + + +/** + * Removes MapObjects when the following conditions are satisfied + * 1. Mapobject(... lambdavariable(..., false) ...), which means types for input and output + * are primitive types with non-nullable + * 2. no custom collection class specified representation of data item. + */ +object EliminateMapObjects extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { + case MapObjects(_, _, _, LambdaVariable(_, _, _), inputData) => inputData + } +} diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala new file mode 100644 index 0000000..4abf189 --- /dev/null +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/catalyst/plans/logical/Subquery.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.Attribute + +/** + * This node is inserted at the top of a subquery when it is optimized. This makes sure we can + * recognize a subquery as such, and it allows us to write subquery aware transformations. + */ +case class Subquery(child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} \ No newline at end of file diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index 47feae0..5caa4dd 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -36,13 +36,13 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser, CarbonSparkSqlParserUtil} import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy} - import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema} +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.util.CarbonScalaUtil +import org.apache.spark.util.CarbonReflectionUtils /** * This class will have carbon catalog and refresh the relation from cache if the carbontable in @@ -301,10 +301,25 @@ class CarbonAnalyzer(catalog: SessionCatalog, conf: CatalystConf, sparkSession: SparkSession, analyzer: Analyzer) extends Analyzer(catalog, conf) { + + val mvPlan = try { + CarbonReflectionUtils.createObject( + "org.apache.carbondata.mv.datamap.MVAnalyzerRule", + sparkSession)._1.asInstanceOf[Rule[LogicalPlan]] + } catch { + case e: Exception => + null + } + override def execute(plan: LogicalPlan): LogicalPlan = { var logicalPlan = analyzer.execute(plan) logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan) CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) + if (mvPlan != null) { + mvPlan.apply(logicalPlan) + } else { + logicalPlan + } } }