[CARBONDATA-2101]Restrict direct query on pre aggregate and timeseries datamap
Restricting direct query on PreAggregate and timeseries data map Added Property to run direct query on data map for testing purpose validate.support.direct.query.on.datamap=true This closes #1888 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/349be007 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/349be007 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/349be007 Branch: refs/heads/branch-1.3 Commit: 349be007fd20fb8c4a39b318e45b47445d2e798c Parents: 46d9bf9 Author: kumarvishal <kumarvishal.1...@gmail.com> Authored: Tue Jan 30 20:54:12 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sat Feb 3 21:32:08 2018 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 10 +++++ .../carbondata/core/util/SessionParams.java | 2 + .../spark/sql/common/util/QueryTest.scala | 4 ++ .../apache/spark/sql/test/util/QueryTest.scala | 3 ++ .../spark/rdd/AggregateDataMapCompactor.scala | 2 + .../sql/CarbonDatasourceHadoopRelation.scala | 1 + .../scala/org/apache/spark/sql/CarbonEnv.scala | 18 +++++++++ .../preaaggregate/PreAggregateUtil.scala | 2 + .../sql/hive/CarbonPreAggregateRules.scala | 9 +++++ .../sql/optimizer/CarbonLateDecodeRule.scala | 40 +++++++++++++++++++- 10 files changed, 89 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index a799e51..6e6482d 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1588,6 +1588,16 @@ public final class CarbonCommonConstants { "carbon.sort.storage.inmemory.size.inmb"; public static final String IN_MEMORY_STORAGE_FOR_SORTED_DATA_IN_MB_DEFAULT = "512"; + @CarbonProperty + public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP = + "carbon.query.directQueryOnDataMap.enabled"; + public static final String SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "false"; + + @CarbonProperty + public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP = + "carbon.query.validate.directqueryondatamap"; + public static final String VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE = "true"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java index ddc7539..a6ff61e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java +++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java @@ -199,6 +199,8 @@ public class SessionParams implements Serializable { } } else if (key.startsWith(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS)) { isValid = true; + } else if (key.equalsIgnoreCase(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP)) { + isValid = true; } else { throw new InvalidConfigurationException( "The key " + key + " not supported for dynamic configuration."); http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala index d80efb8..9c5bc38 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -33,7 +33,9 @@ import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor} import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext} import org.scalatest.Suite +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.util.CarbonProperties class QueryTest extends PlanTest with Suite { @@ -43,6 +45,8 @@ class QueryTest extends PlanTest with Suite { // Add Locale setting Locale.setDefault(Locale.US) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false") /** * Runs the plan and makes sure the answer contains all of the keywords, or the http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala index b87473a..6e5630a 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala @@ -41,6 +41,9 @@ class QueryTest extends PlanTest { // Add Locale setting Locale.setDefault(Locale.US) + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, "false") + /** * Runs the plan and makes sure the answer contains all of the keywords, or the * none of keywords are listed in the answer http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala index 188e776..c8a6b1d 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala @@ -70,6 +70,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel, loadCommand.dataFrame = Some(PreAggregateUtil.getDataFrame( sqlContext.sparkSession, loadCommand.logicalPlan.get)) + CarbonSession.threadSet(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, + "true") loadCommand.processData(sqlContext.sparkSession) val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata( carbonTable.getMetaDataFilepath, uuid) http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index 39a0d1e..0978fab 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -74,6 +74,7 @@ case class CarbonDatasourceHadoopRelation( val projection = new CarbonProjection requiredColumns.foreach(projection.addColumn) + CarbonSession.threadUnset(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP) val inputMetricsStats: CarbonInputMetrics = new CarbonInputMetrics new CarbonScanRDD( sparkSession, http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 6b12008..8444d25 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -265,4 +265,22 @@ object CarbonEnv { tableName) } + def getThreadParam(key: String, defaultValue: String) : String = { + val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + if (null != carbonSessionInfo) { + carbonSessionInfo.getThreadParams.getProperty(key, defaultValue) + } else { + defaultValue + } + } + + def getSessionParam(key: String, defaultValue: String) : String = { + val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + if (null != carbonSessionInfo) { + carbonSessionInfo.getThreadParams.getProperty(key, defaultValue) + } else { + defaultValue + } + } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala index 1d4ebec..ae9bc9b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala @@ -599,6 +599,8 @@ object PreAggregateUtil { CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS + parentTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + "." + parentTableIdentifier.table, validateSegments.toString) + CarbonSession.threadSet(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, + "true") CarbonSession.updateSessionInfoToCurrentThread(sparkSession) try { loadCommand.processData(sparkSession) http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala index de58805..7b4bc0d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala @@ -259,6 +259,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule * @return transformed plan */ def transformPreAggQueryPlan(logicalPlan: LogicalPlan): LogicalPlan = { + var isPlanUpdated = false val updatedPlan = logicalPlan.transform { case agg@Aggregate( grExp, @@ -294,6 +295,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule childPlan, carbonTable, agg) + isPlanUpdated = true Aggregate(updatedGroupExp, updatedAggExp, CarbonReflectionUtils @@ -346,6 +348,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule childPlan, carbonTable, agg) + isPlanUpdated = true Aggregate(updatedGroupExp, updatedAggExp, CarbonReflectionUtils @@ -401,6 +404,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule childPlan, carbonTable, agg) + isPlanUpdated = true Aggregate(updatedGroupExp, updatedAggExp, Filter(updatedFilterExpression.get, @@ -461,6 +465,7 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule childPlan, carbonTable, agg) + isPlanUpdated = true Aggregate(updatedGroupExp, updatedAggExp, Filter(updatedFilterExpression.get, @@ -481,6 +486,10 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule } } + if(isPlanUpdated) { + CarbonSession.threadSet(CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, + "true") + } updatedPlan } http://git-wip-us.apache.org/repos/asf/carbondata/blob/349be007/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala index 06ad0ad..0aa7514 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.types.{IntegerType, StringType} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.stats.QueryStatistic -import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory +import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalSessionInfo} import org.apache.carbondata.spark.CarbonAliasDecoderRelation @@ -66,7 +66,8 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } def checkIfRuleNeedToBeApplied(plan: LogicalPlan, removeSubQuery: Boolean = false): Boolean = { - relations = collectCarbonRelation(plan); + relations = collectCarbonRelation(plan) + validateQueryDirectlyOnDataMap(relations) if (relations.nonEmpty && !isOptimized(plan)) { // In case scalar subquery skip the transformation and update the flag. if (relations.exists(_.carbonRelation.isSubquery.nonEmpty)) { @@ -87,6 +88,41 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper { } } + /** + * Below method will be used to validate if query is directly fired on pre aggregate + * data map or not + * @param relations all relations from query + * + */ + def validateQueryDirectlyOnDataMap(relations: Seq[CarbonDecoderRelation]): Unit = { + var isPreAggDataMapExists = false + // first check if pre aggregate data map exists or not + relations.foreach{relation => + if (relation.carbonRelation.carbonTable.isChildDataMap) { + isPreAggDataMapExists = true + } + } + val validateQuery = CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP, + CarbonCommonConstants.VALIDATE_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean + var isThrowException = false + // if validate query is enabled and relation contains pre aggregate data map + if (validateQuery && isPreAggDataMapExists) { + val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + if (null != carbonSessionInfo) { + val supportQueryOnDataMap = CarbonEnv.getThreadParam( + CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP, + CarbonCommonConstants.SUPPORT_DIRECT_QUERY_ON_DATAMAP_DEFAULTVALUE).toBoolean + if (!supportQueryOnDataMap) { + isThrowException = true + } + } + } + if(isThrowException) { + throw new AnalysisException("Query On DataMap not supported") + } + } + private def collectCarbonRelation(plan: LogicalPlan): Seq[CarbonDecoderRelation] = { plan collect { case l: LogicalRelation if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>