sessionstate hiveclient to be used for all the sql's run on hive metastore.
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/917152a7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/917152a7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/917152a7 Branch: refs/heads/branch-1.1 Commit: 917152a79f0ecdb49afda952da616f80f7865793 Parents: 0a0b7b1 Author: nareshpr <prnaresh.nar...@gmail.com> Authored: Mon Jun 5 15:56:25 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Jun 15 13:17:19 2017 +0530 ---------------------------------------------------------------------- .../sql/execution/command/AlterTableCommands.scala | 12 ++++++------ .../scala/org/apache/spark/util/AlterTableUtil.scala | 9 +++++---- 2 files changed, 11 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/917152a7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala index 4ac3ea2..7969df4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.ListBuffer import scala.language.implicitConversions import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog} +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.LogServiceFactory @@ -100,7 +100,7 @@ private[sql] case class AlterTableAddColumns( .updateSchemaInfo(carbonTable, schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), thriftTable)(sparkSession, - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]) + sparkSession.sessionState.asInstanceOf[CarbonSessionState]) LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName") } catch { @@ -202,10 +202,10 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR carbonTable.getStorePath)(sparkSession) CarbonEnv.getInstance(sparkSession).carbonMetastore .removeTableFromMetadata(oldDatabaseName, oldTableName) - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive .runSqlHive( s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName") - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client + sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive .runSqlHive( s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" + s"('tableName'='$newTableName', " + @@ -339,7 +339,7 @@ private[sql] case class AlterTableDropColumns( .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession, - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]) + sparkSession.sessionState.asInstanceOf[CarbonSessionState]) // TODO: 1. add check for deletion of index tables // delete dictionary files for dictionary column and clear dictionary cache from memory new AlterTableDropColumnRDD(sparkSession.sparkContext, @@ -430,7 +430,7 @@ private[sql] case class AlterTableDataTypeChange( .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession, - sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]) + sparkSession.sessionState.asInstanceOf[CarbonSessionState]) LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName") } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/917152a7/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index d7b1422..9e402cd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.SparkConf import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog} +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.carbondata.common.logging.LogServiceFactory @@ -144,11 +144,12 @@ object AlterTableUtil { * @param schemaEvolutionEntry * @param thriftTable * @param sparkSession - * @param catalog + * @param sessionState */ def updateSchemaInfo(carbonTable: CarbonTable, schemaEvolutionEntry: SchemaEvolutionEntry, - thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = { + thriftTable: TableInfo)(sparkSession: SparkSession, + sessionState: CarbonSessionState): Unit = { val dbName = carbonTable.getDatabaseName val tableName = carbonTable.getFactTableName CarbonEnv.getInstance(sparkSession).carbonMetastore @@ -160,7 +161,7 @@ object AlterTableUtil { val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(tableIdentifier)(sparkSession).schema.json val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema) - catalog.client.runSqlHive( + sessionState.metadataHive.runSqlHive( s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)") sparkSession.catalog.refreshTable(tableIdentifier.quotedString) }