[CARBONDATA-1987] Make package name and directory paths consistent;remove duplicate file CarbonColumnValidator
add coveralls token to spark-2.2 profile;synchronize file path and package name;Delete duplicate class CarbonColumnValidator present in spark2 module This closes #1764 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/4d3f3989 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/4d3f3989 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/4d3f3989 Branch: refs/heads/fgdatamap Commit: 4d3f3989b5b3aef6ed44e3c67c4102bea4505013 Parents: 94011c3 Author: Raghunandan S <carbondatacontributi...@gmail.com> Authored: Thu Jan 4 20:18:07 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Jan 31 08:19:22 2018 +0530 ---------------------------------------------------------------------- .../spark/CarbonColumnValidator.scala | 36 -- .../src/main/spark2.1/CarbonSQLConf.scala | 149 ------- .../src/main/spark2.1/CarbonSessionState.scala | 339 ---------------- .../apache/spark/sql/hive/CarbonSQLConf.scala | 149 +++++++ .../spark/sql/hive/CarbonSessionState.scala | 339 ++++++++++++++++ .../src/main/spark2.2/CarbonSessionState.scala | 398 ------------------- .../src/main/spark2.2/CarbonSqlConf.scala | 148 ------- .../spark/sql/hive/CarbonSessionState.scala | 398 +++++++++++++++++++ .../apache/spark/sql/hive/CarbonSqlConf.scala | 148 +++++++ pom.xml | 8 +- 10 files changed, 1039 insertions(+), 1073 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala deleted file mode 100644 index 03c4764..0000000 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark - -import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException - - /** - * Carbon column validator - */ -class CarbonColumnValidator extends ColumnValidator { - def validateColumns(allColumns: Seq[ColumnSchema]): Unit = { - allColumns.foreach { columnSchema => - val colWithSameId = allColumns.filter { x => - x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId) - } - if (colWithSameId.size > 1) { - throw new MalformedCarbonCommandException("Two column can not have same columnId") - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala deleted file mode 100644 index 15ccb0c..0000000 --- a/integration/spark2/src/main/spark2.1/CarbonSQLConf.scala +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder - -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} -import org.apache.carbondata.core.util.CarbonProperties - -/** - * To initialize dynamic values default param - */ -class CarbonSQLConf(sparkSession: SparkSession) { - - val carbonProperties = CarbonProperties.getInstance() - - /** - * To initialize dynamic param defaults along with usage docs - */ - def addDefaultCarbonParams(): Unit = { - val ENABLE_UNSAFE_SORT = - SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT) - .doc("To enable/ disable unsafe sort.") - .booleanConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, - CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) - val CARBON_CUSTOM_BLOCK_DISTRIBUTION = - SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION) - .doc("To set carbon task distribution.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, - CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) - val BAD_RECORDS_LOGGER_ENABLE = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE) - .doc("To enable/ disable carbon bad record logger.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants - .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) - val BAD_RECORDS_ACTION = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) - .doc("To configure the bad records action.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) - val IS_EMPTY_DATA_BAD_RECORD = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) - .doc("Property to decide weather empty data to be considered bad/ good record.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT - .toBoolean) - val SORT_SCOPE = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE) - .doc("Property to specify sort scope.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) - val BATCH_SORT_SIZE_INMB = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB) - .doc("Property to specify batch sort size in MB.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) - val SINGLE_PASS = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS) - .doc("Property to enable/disable single_pass.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - val BAD_RECORD_PATH = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH) - .doc("Property to configure the bad record location.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - val GLOBAL_SORT_PARTITIONS = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS) - .doc("Property to configure the global sort partitions.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, - CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) - val DATEFORMAT = - SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT) - .doc("Property to configure data format for date type columns.") - .stringConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) - val CARBON_INPUT_SEGMENTS = SQLConfigBuilder( - "carbon.input.segments.<database_name>.<table_name>") - .doc("Property to configure the list of segments to query.").stringConf - .createWithDefault(carbonProperties - .getProperty("carbon.input.segments.<database_name>.<table_name>", "*")) - } - /** - * to set the dynamic properties default values - */ - def addDefaultCarbonSessionParams(): Unit = { - sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT, - carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, - CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) - sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - carbonProperties - .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, - CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, - carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, - carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, - CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, - carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, - CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala deleted file mode 100644 index 0fe0f96..0000000 --- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive - -import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery} -import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.parser.ParserUtils._ -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier} -import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} -import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} -import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} -import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy} - -import org.apache.carbondata.core.datamap.DataMapStoreManager -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} -import org.apache.carbondata.spark.util.CarbonScalaUtil - -/** - * This class will have carbon catalog and refresh the relation from cache if the carbontable in - * carbon catalog is not same as cached carbon relation's carbon table - * - * @param externalCatalog - * @param globalTempViewManager - * @param sparkSession - * @param functionResourceLoader - * @param functionRegistry - * @param conf - * @param hadoopConf - */ -class CarbonSessionCatalog( - externalCatalog: HiveExternalCatalog, - globalTempViewManager: GlobalTempViewManager, - sparkSession: SparkSession, - functionResourceLoader: FunctionResourceLoader, - functionRegistry: FunctionRegistry, - conf: SQLConf, - hadoopConf: Configuration) - extends HiveSessionCatalog( - externalCatalog, - globalTempViewManager, - sparkSession, - functionResourceLoader, - functionRegistry, - conf, - hadoopConf) { - - lazy val carbonEnv = { - val env = new CarbonEnv - env.init(sparkSession) - env - } - - // Initialize all listeners to the Operation bus. - CarbonEnv.initListeners() - - /** - * This method will invalidate carbonrelation from cache if carbon table is updated in - * carbon catalog - * - * @param name - * @param alias - * @return - */ - override def lookupRelation(name: TableIdentifier, - alias: Option[String]): LogicalPlan = { - val rtnRelation = super.lookupRelation(name, alias) - var toRefreshRelation = false - rtnRelation match { - case SubqueryAlias(_, - LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) => - toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) - case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => - toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) - case _ => - } - - if (toRefreshRelation) { - super.lookupRelation(name, alias) - } else { - rtnRelation - } - } - - private def refreshRelationFromCache(identifier: TableIdentifier, - alias: Option[String], - carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { - var isRefreshed = false - val storePath = CarbonProperties.getStorePath - carbonEnv.carbonMetastore. - checkSchemasModifiedTimeAndReloadTable(identifier) - - val table = carbonEnv.carbonMetastore.getTableFromMetadataCache( - carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, - carbonDatasourceHadoopRelation.carbonTable.getTableName) - if (table.isEmpty || (table.isDefined && - table.get.getTableLastUpdatedTime != - carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { - refreshTable(identifier) - DataMapStoreManager.getInstance(). - clearDataMaps(AbsoluteTableIdentifier.from(storePath, - identifier.database.getOrElse("default"), identifier.table)) - isRefreshed = true - logInfo(s"Schema changes have been detected for table: $identifier") - } - isRefreshed - } - - /** - * returns hive client from session state - * - * @return - */ - def getClient(): org.apache.spark.sql.hive.client.HiveClient = { - sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive - } - - override def createPartitions( - tableName: TableIdentifier, - parts: Seq[CatalogTablePartition], - ignoreIfExists: Boolean): Unit = { - try { - val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) - // Get the properties from thread local - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - if (carbonSessionInfo != null) { - val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table) - super.createPartitions(tableName, updatedParts, ignoreIfExists) - } else { - super.createPartitions(tableName, parts, ignoreIfExists) - } - } catch { - case e: Exception => - super.createPartitions(tableName, parts, ignoreIfExists) - } - } - - /** - * This is alternate way of getting partition information. It first fetches all partitions from - * hive and then apply filter instead of querying hive along with filters. - * @param partitionFilters - * @param sparkSession - * @param identifier - * @return - */ - def getPartitionsAlternate( - partitionFilters: Seq[Expression], - sparkSession: SparkSession, - identifier: TableIdentifier) = { - val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier) - val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier) - val partitionSchema = catalogTable.partitionSchema - if (partitionFilters.nonEmpty) { - val boundPredicate = - InterpretedPredicate.create(partitionFilters.reduce(And).transform { - case att: AttributeReference => - val index = partitionSchema.indexWhere(_.name == att.name) - BoundReference(index, partitionSchema(index).dataType, nullable = true) - }) - allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) } - } else { - allPartitions - } - } -} - -/** - * Session state implementation to override sql parser and adding strategies - * @param sparkSession - */ -class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) { - - override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) - - experimentalMethods.extraStrategies = extraStrategies - - experimentalMethods.extraOptimizations = extraOptimizations - - def extraStrategies: Seq[Strategy] = { - Seq( - new StreamingTableStrategy(sparkSession), - new CarbonLateDecodeStrategy, - new DDLStrategy(sparkSession) - ) - } - - def extraOptimizations: Seq[Rule[LogicalPlan]] = { - Seq(new CarbonIUDRule, - new CarbonUDFTransformRule, - new CarbonLateDecodeRule) - } - - override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) - - def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil - def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = { - catalog.ParquetConversions :: - catalog.OrcConversions :: - CarbonPreInsertionCasts(sparkSession) :: - CarbonIUDAnalysisRule(sparkSession) :: - AnalyzeCreateTable(sparkSession) :: - PreprocessTableInsertion(conf) :: - DataSourceAnalysis(conf) :: - (if (conf.runSQLonFile) { - new ResolveDataSource(sparkSession) :: Nil - } else { Nil }) - } - - override lazy val analyzer: Analyzer = - new CarbonAnalyzer(catalog, conf, sparkSession, - new Analyzer(catalog, conf) { - override val extendedResolutionRules = - if (extendedAnalyzerRules.nonEmpty) { - extendedAnalyzerRules ++ internalAnalyzerRules - } else { - internalAnalyzerRules - } - override val extendedCheckRules = Seq( - PreWriteCheck(conf, catalog)) - } - ) - - /** - * Internal catalog for managing table and database states. - */ - override lazy val catalog = { - new CarbonSessionCatalog( - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], - sparkSession.sharedState.globalTempViewManager, - sparkSession, - functionResourceLoader, - functionRegistry, - conf, - newHadoopConf()) - } -} - -class CarbonAnalyzer(catalog: SessionCatalog, - conf: CatalystConf, - sparkSession: SparkSession, - analyzer: Analyzer) extends Analyzer(catalog, conf) { - override def execute(plan: LogicalPlan): LogicalPlan = { - var logicalPlan = analyzer.execute(plan) - logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan) - CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) - } -} - -class CarbonOptimizer( - catalog: SessionCatalog, - conf: SQLConf, - experimentalMethods: ExperimentalMethods) - extends SparkOptimizer(catalog, conf, experimentalMethods) { - - override def execute(plan: LogicalPlan): LogicalPlan = { - val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan) - super.execute(transFormedPlan) - } -} - -object CarbonOptimizerUtil { - def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = { - // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, - // And optimize whole plan at once. - val transFormedPlan = plan.transform { - case filter: Filter => - filter.transformExpressions { - case s: ScalarSubquery => - val tPlan = s.plan.transform { - case lr: LogicalRelation - if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true - lr - } - ScalarSubquery(tPlan, s.children, s.exprId) - case p: PredicateSubquery => - val tPlan = p.plan.transform { - case lr: LogicalRelation - if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true - lr - } - PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId) - } - } - transFormedPlan - } -} - -class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) - extends SparkSqlAstBuilder(conf) { - - val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) - - override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { - val fileStorage = helper.getFileStorage(ctx.createFileFormat) - - if (fileStorage.equalsIgnoreCase("'carbondata'") || - fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { - helper.createCarbonTable( - tableHeader = ctx.createTableHeader, - skewSpecContext = ctx.skewSpec, - bucketSpecContext = ctx.bucketSpec, - partitionColumns = ctx.partitionColumns, - columns = ctx.columns, - tablePropertyList = ctx.tablePropertyList, - locationSpecContext = ctx.locationSpec(), - tableComment = Option(ctx.STRING()).map(string), - ctas = ctx.AS, - query = ctx.query) - } else { - super.visitCreateTable(ctx) - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala new file mode 100644 index 0000000..15ccb0c --- /dev/null +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSQLConf.scala @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf.SQLConfigBuilder + +import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.util.CarbonProperties + +/** + * To initialize dynamic values default param + */ +class CarbonSQLConf(sparkSession: SparkSession) { + + val carbonProperties = CarbonProperties.getInstance() + + /** + * To initialize dynamic param defaults along with usage docs + */ + def addDefaultCarbonParams(): Unit = { + val ENABLE_UNSAFE_SORT = + SQLConfigBuilder(CarbonCommonConstants.ENABLE_UNSAFE_SORT) + .doc("To enable/ disable unsafe sort.") + .booleanConf + .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) + val CARBON_CUSTOM_BLOCK_DISTRIBUTION = + SQLConfigBuilder(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION) + .doc("To set carbon task distribution.") + .stringConf + .createWithDefault(carbonProperties + .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) + val BAD_RECORDS_LOGGER_ENABLE = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE) + .doc("To enable/ disable carbon bad record logger.") + .booleanConf + .createWithDefault(CarbonLoadOptionConstants + .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) + val BAD_RECORDS_ACTION = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) + .doc("To configure the bad records action.") + .stringConf + .createWithDefault(carbonProperties + .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) + val IS_EMPTY_DATA_BAD_RECORD = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) + .doc("Property to decide weather empty data to be considered bad/ good record.") + .booleanConf + .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT + .toBoolean) + val SORT_SCOPE = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE) + .doc("Property to specify sort scope.") + .stringConf + .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) + val BATCH_SORT_SIZE_INMB = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB) + .doc("Property to specify batch sort size in MB.") + .stringConf + .createWithDefault(carbonProperties + .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) + val SINGLE_PASS = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS) + .doc("Property to enable/disable single_pass.") + .booleanConf + .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) + val BAD_RECORD_PATH = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH) + .doc("Property to configure the bad record location.") + .stringConf + .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) + val GLOBAL_SORT_PARTITIONS = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS) + .doc("Property to configure the global sort partitions.") + .stringConf + .createWithDefault(carbonProperties + .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, + CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) + val DATEFORMAT = + SQLConfigBuilder(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT) + .doc("Property to configure data format for date type columns.") + .stringConf + .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) + val CARBON_INPUT_SEGMENTS = SQLConfigBuilder( + "carbon.input.segments.<database_name>.<table_name>") + .doc("Property to configure the list of segments to query.").stringConf + .createWithDefault(carbonProperties + .getProperty("carbon.input.segments.<database_name>.<table_name>", "*")) + } + /** + * to set the dynamic properties default values + */ + def addDefaultCarbonSessionParams(): Unit = { + sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, + CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) + sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, + carbonProperties + .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, + CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, + CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, + carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, + CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, + CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, + carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, + CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, + carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, + CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, + CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, + carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, + carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, + CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, + carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, + CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) + sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, + CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala new file mode 100644 index 0000000..0fe0f96 --- /dev/null +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery} +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.ParserUtils._ +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.CreateTableContext +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.{CatalystConf, TableIdentifier} +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} +import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, ExperimentalMethods, SparkSession, Strategy} + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonSessionCatalog( + externalCatalog: HiveExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + sparkSession: SparkSession, + functionResourceLoader: FunctionResourceLoader, + functionRegistry: FunctionRegistry, + conf: SQLConf, + hadoopConf: Configuration) + extends HiveSessionCatalog( + externalCatalog, + globalTempViewManager, + sparkSession, + functionResourceLoader, + functionRegistry, + conf, + hadoopConf) { + + lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + + // Initialize all listeners to the Operation bus. + CarbonEnv.initListeners() + + /** + * This method will invalidate carbonrelation from cache if carbon table is updated in + * carbon catalog + * + * @param name + * @param alias + * @return + */ + override def lookupRelation(name: TableIdentifier, + alias: Option[String]): LogicalPlan = { + val rtnRelation = super.lookupRelation(name, alias) + var toRefreshRelation = false + rtnRelation match { + case SubqueryAlias(_, + LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) => + toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) + case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => + toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) + case _ => + } + + if (toRefreshRelation) { + super.lookupRelation(name, alias) + } else { + rtnRelation + } + } + + private def refreshRelationFromCache(identifier: TableIdentifier, + alias: Option[String], + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { + var isRefreshed = false + val storePath = CarbonProperties.getStorePath + carbonEnv.carbonMetastore. + checkSchemasModifiedTimeAndReloadTable(identifier) + + val table = carbonEnv.carbonMetastore.getTableFromMetadataCache( + carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, + carbonDatasourceHadoopRelation.carbonTable.getTableName) + if (table.isEmpty || (table.isDefined && + table.get.getTableLastUpdatedTime != + carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime)) { + refreshTable(identifier) + DataMapStoreManager.getInstance(). + clearDataMaps(AbsoluteTableIdentifier.from(storePath, + identifier.database.getOrElse("default"), identifier.table)) + isRefreshed = true + logInfo(s"Schema changes have been detected for table: $identifier") + } + isRefreshed + } + + /** + * returns hive client from session state + * + * @return + */ + def getClient(): org.apache.spark.sql.hive.client.HiveClient = { + sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive + } + + override def createPartitions( + tableName: TableIdentifier, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = { + try { + val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + // Get the properties from thread local + val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + if (carbonSessionInfo != null) { + val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table) + super.createPartitions(tableName, updatedParts, ignoreIfExists) + } else { + super.createPartitions(tableName, parts, ignoreIfExists) + } + } catch { + case e: Exception => + super.createPartitions(tableName, parts, ignoreIfExists) + } + } + + /** + * This is alternate way of getting partition information. It first fetches all partitions from + * hive and then apply filter instead of querying hive along with filters. + * @param partitionFilters + * @param sparkSession + * @param identifier + * @return + */ + def getPartitionsAlternate( + partitionFilters: Seq[Expression], + sparkSession: SparkSession, + identifier: TableIdentifier) = { + val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier) + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier) + val partitionSchema = catalogTable.partitionSchema + if (partitionFilters.nonEmpty) { + val boundPredicate = + InterpretedPredicate.create(partitionFilters.reduce(And).transform { + case att: AttributeReference => + val index = partitionSchema.indexWhere(_.name == att.name) + BoundReference(index, partitionSchema(index).dataType, nullable = true) + }) + allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) } + } else { + allPartitions + } + } +} + +/** + * Session state implementation to override sql parser and adding strategies + * @param sparkSession + */ +class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sparkSession) { + + override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) + + experimentalMethods.extraStrategies = extraStrategies + + experimentalMethods.extraOptimizations = extraOptimizations + + def extraStrategies: Seq[Strategy] = { + Seq( + new StreamingTableStrategy(sparkSession), + new CarbonLateDecodeStrategy, + new DDLStrategy(sparkSession) + ) + } + + def extraOptimizations: Seq[Rule[LogicalPlan]] = { + Seq(new CarbonIUDRule, + new CarbonUDFTransformRule, + new CarbonLateDecodeRule) + } + + override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) + + def extendedAnalyzerRules: Seq[Rule[LogicalPlan]] = Nil + def internalAnalyzerRules: Seq[Rule[LogicalPlan]] = { + catalog.ParquetConversions :: + catalog.OrcConversions :: + CarbonPreInsertionCasts(sparkSession) :: + CarbonIUDAnalysisRule(sparkSession) :: + AnalyzeCreateTable(sparkSession) :: + PreprocessTableInsertion(conf) :: + DataSourceAnalysis(conf) :: + (if (conf.runSQLonFile) { + new ResolveDataSource(sparkSession) :: Nil + } else { Nil }) + } + + override lazy val analyzer: Analyzer = + new CarbonAnalyzer(catalog, conf, sparkSession, + new Analyzer(catalog, conf) { + override val extendedResolutionRules = + if (extendedAnalyzerRules.nonEmpty) { + extendedAnalyzerRules ++ internalAnalyzerRules + } else { + internalAnalyzerRules + } + override val extendedCheckRules = Seq( + PreWriteCheck(conf, catalog)) + } + ) + + /** + * Internal catalog for managing table and database states. + */ + override lazy val catalog = { + new CarbonSessionCatalog( + sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog], + sparkSession.sharedState.globalTempViewManager, + sparkSession, + functionResourceLoader, + functionRegistry, + conf, + newHadoopConf()) + } +} + +class CarbonAnalyzer(catalog: SessionCatalog, + conf: CatalystConf, + sparkSession: SparkSession, + analyzer: Analyzer) extends Analyzer(catalog, conf) { + override def execute(plan: LogicalPlan): LogicalPlan = { + var logicalPlan = analyzer.execute(plan) + logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan) + CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) + } +} + +class CarbonOptimizer( + catalog: SessionCatalog, + conf: SQLConf, + experimentalMethods: ExperimentalMethods) + extends SparkOptimizer(catalog, conf, experimentalMethods) { + + override def execute(plan: LogicalPlan): LogicalPlan = { + val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan) + super.execute(transFormedPlan) + } +} + +object CarbonOptimizerUtil { + def transformForScalarSubQuery(plan: LogicalPlan) : LogicalPlan = { + // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, + // And optimize whole plan at once. + val transFormedPlan = plan.transform { + case filter: Filter => + filter.transformExpressions { + case s: ScalarSubquery => + val tPlan = s.plan.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + ScalarSubquery(tPlan, s.children, s.exprId) + case p: PredicateSubquery => + val tPlan = p.plan.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + PredicateSubquery(tPlan, p.children, p.nullAware, p.exprId) + } + } + transFormedPlan + } +} + +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { + + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) + + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { + val fileStorage = helper.getFileStorage(ctx.createFileFormat) + + if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { + helper.createCarbonTable( + tableHeader = ctx.createTableHeader, + skewSpecContext = ctx.skewSpec, + bucketSpecContext = ctx.bucketSpec, + partitionColumns = ctx.partitionColumns, + columns = ctx.columns, + tablePropertyList = ctx.tablePropertyList, + locationSpecContext = ctx.locationSpec(), + tableComment = Option(ctx.STRING()).map(string), + ctas = ctx.AS, + query = ctx.query) + } else { + super.visitCreateTable(ctx) + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala deleted file mode 100644 index 3c151f0..0000000 --- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala +++ /dev/null @@ -1,398 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive - - -import scala.collection.generic.SeqFactory - -import org.apache.hadoop.conf.Configuration -import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery} -import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.parser.ParserUtils.string -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand} -import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} -import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} -import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} -import org.apache.spark.sql.hive.client.HiveClient -import org.apache.spark.sql.internal.{SQLConf, SessionState} -import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} -import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} -import org.apache.spark.sql.types.DecimalType -import org.apache.spark.util.CarbonReflectionUtils - -import org.apache.carbondata.core.datamap.DataMapStoreManager -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.util.CarbonScalaUtil - -/** - * This class will have carbon catalog and refresh the relation from cache if the carbontable in - * carbon catalog is not same as cached carbon relation's carbon table - * - * @param externalCatalog - * @param globalTempViewManager - * @param sparkSession - * @param functionResourceLoader - * @param functionRegistry - * @param conf - * @param hadoopConf - */ -class CarbonSessionCatalog( - externalCatalog: HiveExternalCatalog, - globalTempViewManager: GlobalTempViewManager, - functionRegistry: FunctionRegistry, - sparkSession: SparkSession, - conf: SQLConf, - hadoopConf: Configuration, - parser: ParserInterface, - functionResourceLoader: FunctionResourceLoader) - extends HiveSessionCatalog( - externalCatalog, - globalTempViewManager, - new HiveMetastoreCatalog(sparkSession), - functionRegistry, - conf, - hadoopConf, - parser, - functionResourceLoader - ) { - - lazy val carbonEnv = { - val env = new CarbonEnv - env.init(sparkSession) - env - } - - def getCarbonEnv() : CarbonEnv = { - carbonEnv - } - - // Initialize all listeners to the Operation bus. - CarbonEnv.initListeners() - - - - - override def lookupRelation(name: TableIdentifier): LogicalPlan = { - val rtnRelation = super.lookupRelation(name) - var toRefreshRelation = false - rtnRelation match { - case SubqueryAlias(_, - LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) => - toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession) - case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) => - toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession) - case SubqueryAlias(_, relation) if - relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || - relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || - relation.getClass.getName.equals( - "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") => - val catalogTable = - CarbonReflectionUtils.getFieldOfCatalogTable( - "tableMeta", - relation).asInstanceOf[CatalogTable] - toRefreshRelation = - CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession) - case _ => - } - - if (toRefreshRelation) { - super.lookupRelation(name) - } else { - rtnRelation - } - } - - /** - * returns hive client from HiveExternalCatalog - * - * @return - */ - def getClient(): org.apache.spark.sql.hive.client.HiveClient = { - sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog - .asInstanceOf[HiveExternalCatalog].client - } - - override def createPartitions( - tableName: TableIdentifier, - parts: Seq[CatalogTablePartition], - ignoreIfExists: Boolean): Unit = { - try { - val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) - // Get the properties from thread local - val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo - if (carbonSessionInfo != null) { - val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table) - super.createPartitions(tableName, updatedParts, ignoreIfExists) - } else { - super.createPartitions(tableName, parts, ignoreIfExists) - } - } catch { - case e: Exception => - super.createPartitions(tableName, parts, ignoreIfExists) - } - } - - /** - * This is alternate way of getting partition information. It first fetches all partitions from - * hive and then apply filter instead of querying hive along with filters. - * @param partitionFilters - * @param sparkSession - * @param identifier - * @return - */ - def getPartitionsAlternate(partitionFilters: Seq[Expression], - sparkSession: SparkSession, - identifier: TableIdentifier) = { - val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier) - ExternalCatalogUtils.prunePartitionsByFilter( - sparkSession.sessionState.catalog.getTableMetadata(identifier), - allPartitions, - partitionFilters, - sparkSession.sessionState.conf.sessionLocalTimeZone) - } -} - - -class CarbonAnalyzer(catalog: SessionCatalog, - conf: SQLConf, - sparkSession: SparkSession, - analyzer: Analyzer) extends Analyzer(catalog, conf) { - override def execute(plan: LogicalPlan): LogicalPlan = { - var logicalPlan = analyzer.execute(plan) - logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan) - CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) - } -} - - -/** - * Session state implementation to override sql parser and adding strategies - * - * @param sparkSession - */ -class CarbonSessionStateBuilder(sparkSession: SparkSession, - parentState: Option[SessionState] = None) - extends HiveSessionStateBuilder(sparkSession, parentState) { - - override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) - - experimentalMethods.extraStrategies = - Seq(new StreamingTableStrategy(sparkSession), - new CarbonLateDecodeStrategy, - new DDLStrategy(sparkSession) - ) - experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule, - new CarbonUDFTransformRule, - new CarbonLateDecodeRule) - - /** - * Internal catalog for managing table and database states. - */ - /** - * Create a [[CarbonSessionCatalogBuild]]. - */ - override protected lazy val catalog: CarbonSessionCatalog = { - val catalog = new CarbonSessionCatalog( - externalCatalog, - session.sharedState.globalTempViewManager, - functionRegistry, - sparkSession, - conf, - SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), - sqlParser, - resourceLoader) - parentState.foreach(_.catalog.copyStateTo(catalog)) - catalog - } - - private def externalCatalog: HiveExternalCatalog = - session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] - - /** - * Create a Hive aware resource loader. - */ - override protected lazy val resourceLoader: HiveSessionResourceLoader = { - val client: HiveClient = externalCatalog.client.newSession() - new HiveSessionResourceLoader(session, client) - } - - override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) - - override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession, - new Analyzer(catalog, conf) { - - override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = - new ResolveHiveSerdeTable(session) +: - new FindDataSourceTable(session) +: - new ResolveSQLOnFile(session) +: - new CarbonIUDAnalysisRule(sparkSession) +: - new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules - - override val extendedCheckRules: Seq[LogicalPlan => Unit] = - PreWriteCheck :: HiveOnlyCheck :: Nil - - override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = - new DetermineTableStats(session) +: - RelationConversions(conf, catalog) +: - PreprocessTableCreation(session) +: - PreprocessTableInsertion(conf) +: - DataSourceAnalysis(conf) +: - HiveAnalysis +: - customPostHocResolutionRules - } - ) - - override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _) - -} - - -class CarbonOptimizer( - catalog: SessionCatalog, - conf: SQLConf, - experimentalMethods: ExperimentalMethods) - extends SparkOptimizer(catalog, conf, experimentalMethods) { - - override def execute(plan: LogicalPlan): LogicalPlan = { - // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And - // optimize whole plan at once. - val transFormedPlan = plan.transform { - case filter: Filter => - filter.transformExpressions { - case s: ScalarSubquery => - val tPlan = s.plan.transform { - case lr: LogicalRelation - if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true - lr - } - ScalarSubquery(tPlan, s.children, s.exprId) - case e: Exists => - val tPlan = e.plan.transform { - case lr: LogicalRelation - if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true - lr - } - Exists(tPlan, e.children.map(_.canonicalized), e.exprId) - - case In(value, Seq(l@ListQuery(sub, _, exprId))) => - val tPlan = sub.transform { - case lr: LogicalRelation - if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true - lr - } - In(value, Seq(ListQuery(tPlan, l.children , exprId))) - } - } - super.execute(transFormedPlan) - } -} - -class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) - extends SparkSqlAstBuilder(conf) { - - val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) - - override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { - val fileStorage = helper.getFileStorage(ctx.createFileFormat) - - if (fileStorage.equalsIgnoreCase("'carbondata'") || - fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { - helper.createCarbonTable( - tableHeader = ctx.createTableHeader, - skewSpecContext = ctx.skewSpec, - bucketSpecContext = ctx.bucketSpec, - partitionColumns = ctx.partitionColumns, - columns = ctx.columns, - tablePropertyList = ctx.tablePropertyList, - locationSpecContext = ctx.locationSpec(), - tableComment = Option(ctx.STRING()).map(string), - ctas = ctx.AS, - query = ctx.query) - } else { - super.visitCreateHiveTable(ctx) - } - } - - override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = { - - val newColumn = visitColType(ctx.colType) - if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) { - throw new MalformedCarbonCommandException( - "Column names provided are different. Both the column names should be same") - } - - val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match { - case d:DecimalType => ("decimal", Some(List((d.precision, d.scale)))) - case _ => (newColumn.dataType.typeName.toLowerCase, None) - } - - val alterTableChangeDataTypeModel = - AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values), - new CarbonSpark2SqlParser() - .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)), - ctx.tableIdentifier().table.getText.toLowerCase, - ctx.identifier.getText.toLowerCase, - newColumn.name.toLowerCase) - - CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) - } - - - override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = { - - val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList) - val fields = parser.getFields(cols) - val tblProperties = scala.collection.mutable.Map.empty[String, String] - val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false, - new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db) - .map(_.getText)), - ctx.tableIdentifier.table.getText.toLowerCase, - fields, - Seq.empty, - tblProperties, - None, - true) - - val alterTableAddColumnsModel = AlterTableAddColumnsModel( - Option(ctx.tableIdentifier().db).map(_.getText), - ctx.tableIdentifier.table.getText, - tblProperties.toMap, - tableModel.dimCols, - tableModel.msrCols, - tableModel.highcardinalitydims.getOrElse(Seq.empty)) - - CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) - } - - override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { - super.visitCreateTable(ctx) - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala b/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala deleted file mode 100644 index 2128ffd..0000000 --- a/integration/spark2/src/main/spark2.2/CarbonSqlConf.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive - -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.internal.SQLConf.buildConf - -import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} -import org.apache.carbondata.core.util.CarbonProperties - -/** - * To initialize dynamic values default param - */ -class CarbonSQLConf(sparkSession: SparkSession) { - - val carbonProperties = CarbonProperties.getInstance() - - /** - * To initialize dynamic param defaults along with usage docs - */ - def addDefaultCarbonParams(): Unit = { - val ENABLE_UNSAFE_SORT = - buildConf(CarbonCommonConstants.ENABLE_UNSAFE_SORT) - .doc("To enable/ disable unsafe sort.") - .booleanConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, - CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) - val CARBON_CUSTOM_BLOCK_DISTRIBUTION = - buildConf(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION) - .doc("To set carbon task distribution.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, - CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) - val BAD_RECORDS_LOGGER_ENABLE = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE) - .doc("To enable/ disable carbon bad record logger.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants - .CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) - val BAD_RECORDS_ACTION = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION) - .doc("To configure the bad records action.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) - val IS_EMPTY_DATA_BAD_RECORD = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD) - .doc("Property to decide weather empty data to be considered bad/ good record.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT - .toBoolean) - val SORT_SCOPE = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE) - .doc("Property to specify sort scope.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) - val BATCH_SORT_SIZE_INMB = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB) - .doc("Property to specify batch sort size in MB.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) - val SINGLE_PASS = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS) - .doc("Property to enable/disable single_pass.") - .booleanConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - val BAD_RECORD_PATH = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH) - .doc("Property to configure the bad record location.") - .stringConf - .createWithDefault(carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - val GLOBAL_SORT_PARTITIONS = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS) - .doc("Property to configure the global sort partitions.") - .stringConf - .createWithDefault(carbonProperties - .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, - CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) - val DATEFORMAT = - buildConf(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT) - .doc("Property to configure data format for date type columns.") - .stringConf - .createWithDefault(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) - val CARBON_INPUT_SEGMENTS = buildConf( - "carbon.input.segments.<database_name>.<table_name>") - .doc("Property to configure the list of segments to query.").stringConf - .createWithDefault(carbonProperties - .getProperty("carbon.input.segments.<database_name>.<table_name>", "*")) - } - /** - * to set the dynamic properties default values - */ - def addDefaultCarbonSessionParams(): Unit = { - sparkSession.conf.set(CarbonCommonConstants.ENABLE_UNSAFE_SORT, - carbonProperties.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, - CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT).toBoolean) - sparkSession.conf.set(CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION, - carbonProperties - .getProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION, - CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE, - CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT.toBoolean) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, - CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD, - CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT.toBoolean) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE, - carbonProperties.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, - CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB, - carbonProperties.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB, - CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS, - CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT.toBoolean) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH, - carbonProperties.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC, - CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, - carbonProperties.getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, - CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)) - sparkSession.conf.set(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, - CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT) - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/4d3f3989/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala new file mode 100644 index 0000000..3c151f0 --- /dev/null +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + + +import scala.collection.generic.SeqFactory + +import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery} +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.parser.ParserUtils.string +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand} +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} +import org.apache.spark.sql.hive.client.HiveClient +import org.apache.spark.sql.internal.{SQLConf, SessionState} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} +import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} +import org.apache.spark.sql.types.DecimalType +import org.apache.spark.util.CarbonReflectionUtils + +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo} +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class CarbonSessionCatalog( + externalCatalog: HiveExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + functionRegistry: FunctionRegistry, + sparkSession: SparkSession, + conf: SQLConf, + hadoopConf: Configuration, + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) + extends HiveSessionCatalog( + externalCatalog, + globalTempViewManager, + new HiveMetastoreCatalog(sparkSession), + functionRegistry, + conf, + hadoopConf, + parser, + functionResourceLoader + ) { + + lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + + def getCarbonEnv() : CarbonEnv = { + carbonEnv + } + + // Initialize all listeners to the Operation bus. + CarbonEnv.initListeners() + + + + + override def lookupRelation(name: TableIdentifier): LogicalPlan = { + val rtnRelation = super.lookupRelation(name) + var toRefreshRelation = false + rtnRelation match { + case SubqueryAlias(_, + LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) => + toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession) + case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) => + toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession) + case SubqueryAlias(_, relation) if + relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || + relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || + relation.getClass.getName.equals( + "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") => + val catalogTable = + CarbonReflectionUtils.getFieldOfCatalogTable( + "tableMeta", + relation).asInstanceOf[CatalogTable] + toRefreshRelation = + CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession) + case _ => + } + + if (toRefreshRelation) { + super.lookupRelation(name) + } else { + rtnRelation + } + } + + /** + * returns hive client from HiveExternalCatalog + * + * @return + */ + def getClient(): org.apache.spark.sql.hive.client.HiveClient = { + sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client + } + + override def createPartitions( + tableName: TableIdentifier, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = { + try { + val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + // Get the properties from thread local + val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo + if (carbonSessionInfo != null) { + val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table) + super.createPartitions(tableName, updatedParts, ignoreIfExists) + } else { + super.createPartitions(tableName, parts, ignoreIfExists) + } + } catch { + case e: Exception => + super.createPartitions(tableName, parts, ignoreIfExists) + } + } + + /** + * This is alternate way of getting partition information. It first fetches all partitions from + * hive and then apply filter instead of querying hive along with filters. + * @param partitionFilters + * @param sparkSession + * @param identifier + * @return + */ + def getPartitionsAlternate(partitionFilters: Seq[Expression], + sparkSession: SparkSession, + identifier: TableIdentifier) = { + val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier) + ExternalCatalogUtils.prunePartitionsByFilter( + sparkSession.sessionState.catalog.getTableMetadata(identifier), + allPartitions, + partitionFilters, + sparkSession.sessionState.conf.sessionLocalTimeZone) + } +} + + +class CarbonAnalyzer(catalog: SessionCatalog, + conf: SQLConf, + sparkSession: SparkSession, + analyzer: Analyzer) extends Analyzer(catalog, conf) { + override def execute(plan: LogicalPlan): LogicalPlan = { + var logicalPlan = analyzer.execute(plan) + logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan) + CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) + } +} + + +/** + * Session state implementation to override sql parser and adding strategies + * + * @param sparkSession + */ +class CarbonSessionStateBuilder(sparkSession: SparkSession, + parentState: Option[SessionState] = None) + extends HiveSessionStateBuilder(sparkSession, parentState) { + + override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) + + experimentalMethods.extraStrategies = + Seq(new StreamingTableStrategy(sparkSession), + new CarbonLateDecodeStrategy, + new DDLStrategy(sparkSession) + ) + experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule, + new CarbonUDFTransformRule, + new CarbonLateDecodeRule) + + /** + * Internal catalog for managing table and database states. + */ + /** + * Create a [[CarbonSessionCatalogBuild]]. + */ + override protected lazy val catalog: CarbonSessionCatalog = { + val catalog = new CarbonSessionCatalog( + externalCatalog, + session.sharedState.globalTempViewManager, + functionRegistry, + sparkSession, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + resourceLoader) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog + } + + private def externalCatalog: HiveExternalCatalog = + session.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog] + + /** + * Create a Hive aware resource loader. + */ + override protected lazy val resourceLoader: HiveSessionResourceLoader = { + val client: HiveClient = externalCatalog.client.newSession() + new HiveSessionResourceLoader(session, client) + } + + override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) + + override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession, + new Analyzer(catalog, conf) { + + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new ResolveHiveSerdeTable(session) +: + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + new CarbonIUDAnalysisRule(sparkSession) +: + new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules + + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck :: HiveOnlyCheck :: Nil + + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + new DetermineTableStats(session) +: + RelationConversions(conf, catalog) +: + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + HiveAnalysis +: + customPostHocResolutionRules + } + ) + + override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _) + +} + + +class CarbonOptimizer( + catalog: SessionCatalog, + conf: SQLConf, + experimentalMethods: ExperimentalMethods) + extends SparkOptimizer(catalog, conf, experimentalMethods) { + + override def execute(plan: LogicalPlan): LogicalPlan = { + // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And + // optimize whole plan at once. + val transFormedPlan = plan.transform { + case filter: Filter => + filter.transformExpressions { + case s: ScalarSubquery => + val tPlan = s.plan.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + ScalarSubquery(tPlan, s.children, s.exprId) + case e: Exists => + val tPlan = e.plan.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + Exists(tPlan, e.children.map(_.canonicalized), e.exprId) + + case In(value, Seq(l@ListQuery(sub, _, exprId))) => + val tPlan = sub.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + In(value, Seq(ListQuery(tPlan, l.children , exprId))) + } + } + super.execute(transFormedPlan) + } +} + +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { + + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) + + override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { + val fileStorage = helper.getFileStorage(ctx.createFileFormat) + + if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { + helper.createCarbonTable( + tableHeader = ctx.createTableHeader, + skewSpecContext = ctx.skewSpec, + bucketSpecContext = ctx.bucketSpec, + partitionColumns = ctx.partitionColumns, + columns = ctx.columns, + tablePropertyList = ctx.tablePropertyList, + locationSpecContext = ctx.locationSpec(), + tableComment = Option(ctx.STRING()).map(string), + ctas = ctx.AS, + query = ctx.query) + } else { + super.visitCreateHiveTable(ctx) + } + } + + override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = { + + val newColumn = visitColType(ctx.colType) + if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) { + throw new MalformedCarbonCommandException( + "Column names provided are different. Both the column names should be same") + } + + val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match { + case d:DecimalType => ("decimal", Some(List((d.precision, d.scale)))) + case _ => (newColumn.dataType.typeName.toLowerCase, None) + } + + val alterTableChangeDataTypeModel = + AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values), + new CarbonSpark2SqlParser() + .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)), + ctx.tableIdentifier().table.getText.toLowerCase, + ctx.identifier.getText.toLowerCase, + newColumn.name.toLowerCase) + + CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) + } + + + override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = { + + val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList) + val fields = parser.getFields(cols) + val tblProperties = scala.collection.mutable.Map.empty[String, String] + val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db) + .map(_.getText)), + ctx.tableIdentifier.table.getText.toLowerCase, + fields, + Seq.empty, + tblProperties, + None, + true) + + val alterTableAddColumnsModel = AlterTableAddColumnsModel( + Option(ctx.tableIdentifier().db).map(_.getText), + ctx.tableIdentifier.table.getText, + tblProperties.toMap, + tableModel.dimCols, + tableModel.msrCols, + tableModel.highcardinalitydims.getOrElse(Seq.empty)) + + CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) + } + + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { + super.visitCreateTable(ctx) + } +}