Repository: carbondata Updated Branches: refs/heads/master a9d5e9dec -> e6d03d112
[CARBONDATA-2312]Support In Memory Catalog Support Storing Catalog in memory(not in hive) for each session, after session restart user can create eternal table and run select query This closes #2103 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e6d03d11 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e6d03d11 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e6d03d11 Branch: refs/heads/master Commit: e6d03d11212f8c5c66052b0f6e97144cf327bea6 Parents: a9d5e9d Author: kumarvishal <kumarvishal.1...@gmail.com> Authored: Wed Apr 4 12:03:47 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Fri Apr 20 23:09:09 2018 +0800 ---------------------------------------------------------------------- .../spark/sql/common/util/QueryTest.scala | 2 +- .../spark/util/CarbonReflectionUtils.scala | 25 +- .../scala/org/apache/spark/sql/CarbonEnv.scala | 5 +- .../org/apache/spark/sql/CarbonSession.scala | 23 +- .../CarbonAlterTableCompactionCommand.scala | 5 +- .../CarbonAlterTableAddColumnCommand.scala | 11 +- .../CarbonAlterTableDataTypeChangeCommand.scala | 11 +- .../CarbonAlterTableDropColumnCommand.scala | 17 +- .../schema/CarbonAlterTableRenameCommand.scala | 40 +-- .../schema/CarbonAlterTableSetCommand.scala | 4 +- .../schema/CarbonAlterTableUnsetCommand.scala | 4 +- .../spark/sql/hive/CarbonHiveMetaStore.scala | 9 +- .../spark/sql/hive/CarbonSessionCatalog.scala | 56 +++- .../org/apache/spark/util/AlterTableUtil.scala | 27 +- .../spark/sql/hive/CarbonSessionState.scala | 50 +++- .../apache/spark/sql/hive/CarbonAnalyzer.scala | 34 +++ .../sql/hive/CarbonInMemorySessionState.scala | 276 +++++++++++++++++++ .../apache/spark/sql/hive/CarbonOptimizer.scala | 77 ++++++ .../spark/sql/hive/CarbonSessionState.scala | 231 ++++------------ .../spark/sql/hive/CarbonSqlAstBuilder.scala | 124 +++++++++ 20 files changed, 777 insertions(+), 254 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala index 9c5bc38..d45e759 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -28,7 +28,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.command.LoadDataCommand -import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog} +import org.apache.spark.sql.hive.{CarbonSessionCatalog} import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor} import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext} import org.scalatest.Suite http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala index 69eb021..4264aa1 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala @@ -194,19 +194,30 @@ object CarbonReflectionUtils { } } - def getSessionState(sparkContext: SparkContext, carbonSession: Object): Any = { + def getSessionState(sparkContext: SparkContext, + carbonSession: Object, + useHiveMetaStore: Boolean): Any = { if (SPARK_VERSION.startsWith("2.1")) { val className = sparkContext.conf.get( CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME, "org.apache.spark.sql.hive.CarbonSessionState") createObject(className, carbonSession)._1 } else if (SPARK_VERSION.startsWith("2.2")) { - val className = sparkContext.conf.get( - CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME, - "org.apache.spark.sql.hive.CarbonSessionStateBuilder") - val tuple = createObject(className, carbonSession, None) - val method = tuple._2.getMethod("build") - method.invoke(tuple._1) + if (useHiveMetaStore) { + val className = sparkContext.conf.get( + CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME, + "org.apache.spark.sql.hive.CarbonSessionStateBuilder") + val tuple = createObject(className, carbonSession, None) + val method = tuple._2.getMethod("build") + method.invoke(tuple._1) + } else { + val className = sparkContext.conf.get( + CarbonCommonConstants.CARBON_SESSIONSTATE_CLASSNAME, + "org.apache.spark.sql.hive.CarbonInMemorySessionStateBuilder") + val tuple = createObject(className, carbonSession, None) + val method = tuple._2.getMethod("build") + method.invoke(tuple._1) + } } else { throw new UnsupportedOperationException("Spark version not supported") } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 593ecce..00e0aed 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -23,6 +23,7 @@ import scala.util.Try import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction import org.apache.spark.sql.hive.{HiveSessionCatalog, _} @@ -123,7 +124,7 @@ object CarbonEnv { def getInstance(sparkSession: SparkSession): CarbonEnv = { if (sparkSession.isInstanceOf[CarbonSession]) { - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv() + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].getCarbonEnv } else { var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession) if (carbonEnv == null) { @@ -249,7 +250,7 @@ object CarbonEnv { */ def getDatabaseLocation(dbName: String, sparkSession: SparkSession): String = { var databaseLocation = - sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getDatabaseMetadata(dbName) + sparkSession.sessionState.catalog.asInstanceOf[SessionCatalog].getDatabaseMetadata(dbName) .locationUri.toString // for default database and db ends with .db // check whether the carbon store and hive store is same or different. http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 7ee3038..1d5a82d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -43,7 +43,8 @@ import org.apache.carbondata.streaming.CarbonStreamingQueryListener * User needs to use {CarbonSession.getOrCreateCarbon} to create Carbon session. */ class CarbonSession(@transient val sc: SparkContext, - @transient private val existingSharedState: Option[SharedState] + @transient private val existingSharedState: Option[SharedState], + @transient useHiveMetaStore: Boolean = true ) extends SparkSession(sc) { self => def this(sc: SparkContext) { @@ -51,8 +52,10 @@ class CarbonSession(@transient val sc: SparkContext, } @transient - override lazy val sessionState: SessionState = - CarbonReflectionUtils.getSessionState(sparkContext, this).asInstanceOf[SessionState] + override lazy val sessionState: SessionState = { + CarbonReflectionUtils.getSessionState(sparkContext, this, useHiveMetaStore) + .asInstanceOf[SessionState] + } /** * State shared across sessions, including the `SparkContext`, cached data, listener, @@ -74,7 +77,7 @@ class CarbonSession(@transient val sc: SparkContext, } override def newSession(): SparkSession = { - new CarbonSession(sparkContext, Some(sharedState)) + new CarbonSession(sparkContext, Some(sharedState), useHiveMetaStore) } override def sql(sqlText: String): DataFrame = { @@ -116,10 +119,16 @@ object CarbonSession { private val statementId = new AtomicLong(0) + private var enableInMemCatlog: Boolean = false + private[sql] val threadStatementId = new ThreadLocal[Long]() implicit class CarbonBuilder(builder: Builder) { + def enableInMemoryCatalog(): Builder = { + enableInMemCatlog = true + builder + } def getOrCreateCarbonSession(): SparkSession = { getOrCreateCarbonSession(null, null) } @@ -132,7 +141,9 @@ object CarbonSession { def getOrCreateCarbonSession(storePath: String, metaStorePath: String): SparkSession = synchronized { - builder.enableHiveSupport() + if (!enableInMemCatlog) { + builder.enableHiveSupport() + } val options = getValue("options", builder).asInstanceOf[scala.collection.mutable.HashMap[String, String]] val userSuppliedContext: Option[SparkContext] = @@ -205,7 +216,7 @@ object CarbonSession { sc } - session = new CarbonSession(sparkContext) + session = new CarbonSession(sparkContext, None, !enableInMemCatlog) val carbonProperties = CarbonProperties.getInstance() if (storePath != null) { carbonProperties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index a7b5f7e..c462c9e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel} -import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} +import org.apache.spark.sql.hive.{CarbonRelation} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.AlterTableUtil @@ -299,8 +299,7 @@ case class CarbonAlterTableCompactionCommand( tableIdentifier, Map("streaming" -> "false"), Seq.empty, - true)(sparkSession, - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) + true)(sparkSession) // 5. remove checkpoint FileFactory.deleteAllFilesOfDir( new File(CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath))) http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 4962c3a..d33fc6d 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -85,11 +85,14 @@ private[sql] case class CarbonAlterTableAddColumnCommand( schemaEvolutionEntry.setAdded(newCols.toList.asJava) val thriftTable = schemaConverter .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - AlterTableUtil - .updateSchemaInfo(carbonTable, + val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo( + carbonTable, schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), - thriftTable)(sparkSession, - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) + thriftTable, + Some(newCols))(sparkSession) + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns( + tableIdentifier, schemaParts, cols) + sparkSession.catalog.refreshTable(tableIdentifier.quotedString) val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent = new AlterTableAddColumnPostEvent(sparkSession, carbonTable, alterTableAddColumnsModel) http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala index ff17cfd..accaa27 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala @@ -26,6 +26,7 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus} @@ -96,9 +97,13 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava) tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) .setTime_stamp(System.currentTimeMillis) - AlterTableUtil.updateSchemaInfo( - carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession, - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val a = List(schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema)) + val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo( + carbonTable, schemaEvolutionEntry, tableInfo, Some(a))(sparkSession) + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] + .alterColumnChangeDataType(tableIdentifier, schemaParts, cols) + sparkSession.catalog.refreshTable(tableIdentifier.quotedString) val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent = new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable, alterTableDataTypeChangeModel) http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index a64bdb9..ff1541b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -27,9 +27,9 @@ import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{AlterTableDropColumnPostEvent, AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.SchemaEvolutionEntry import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD @@ -116,9 +116,18 @@ private[sql] case class CarbonAlterTableDropColumnCommand( timeStamp = System.currentTimeMillis val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp) schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava) - AlterTableUtil - .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession, - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) + val schemaConverter = new ThriftWrapperSchemaConverterImpl + val delCols = deletedColumnSchema.map { deleteCols => + schemaConverter.fromExternalToWrapperColumnSchema(deleteCols) + } + val (tableIdentifier, schemaParts, cols) = AlterTableUtil.updateSchemaInfo( + carbonTable, + schemaEvolutionEntry, + tableInfo, + Some(delCols))(sparkSession) + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] + .alterDropColumns(tableIdentifier, schemaParts, cols) + sparkSession.catalog.refreshTable(tableIdentifier.quotedString) // TODO: 1. add check for deletion of index tables // delete dictionary files for dictionary column and clear dictionary cache from memory new AlterTableDropColumnRDD(sparkSession.sparkContext, http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index e349e93..af52d6b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -125,26 +125,21 @@ private[sql] case class CarbonAlterTableRenameCommand( val fileType = FileFactory.getFileType(tableMetadataFile) val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, newTableName, carbonTable.getCarbonTableIdentifier.getTableId) + val oldIdentifier = TableIdentifier(oldTableName, Some(oldDatabaseName)) + val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName)) var newTablePath = CarbonTablePath.getNewTablePath( oldTableIdentifier.getTablePath, newTableIdentifier.getTableName) metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) - val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] - .getClient() var partitions: Seq[CatalogTablePartition] = Seq.empty if (carbonTable.isHivePartitionTable) { partitions = - sparkSession.sessionState.catalog.listPartitions( - TableIdentifier(oldTableName, Some(oldDatabaseName))) + sparkSession.sessionState.catalog.listPartitions(oldIdentifier) } - sparkSession.catalog.refreshTable(TableIdentifier(oldTableName, - Some(oldDatabaseName)).quotedString) - hiveClient.runSqlHive( - s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName") - hiveClient.runSqlHive( - s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" + - s"('tableName'='$newTableName', " + - s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')") - + sparkSession.catalog.refreshTable(oldIdentifier.quotedString) + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterTableRename( + oldIdentifier, + newIdentifier, + newTablePath) // changed the rename order to deal with situation when carbon table and hive table // will point to the same tablePath if (FileFactory.isFileExist(tableMetadataFile, fileType)) { @@ -160,16 +155,19 @@ private[sql] case class CarbonAlterTableRenameCommand( partitions, oldTableIdentifier.getTablePath, newTablePath, - sparkSession) + sparkSession, + newIdentifier.table, + oldDatabaseName) - val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName)) val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(newIdentifier) // Update the storage location with new path sparkSession.sessionState.catalog.alterTable( catalogTable.copy(storage = sparkSession.sessionState.catalog. asInstanceOf[CarbonSessionCatalog].updateStorageLocation( new Path(newTablePath), - catalogTable.storage))) + catalogTable.storage, + newIdentifier.table, + oldDatabaseName))) if (updatedParts.nonEmpty) { // Update the new updated partitions specs with new location. sparkSession.sessionState.catalog.alterPartitions( @@ -232,14 +230,20 @@ private[sql] case class CarbonAlterTableRenameCommand( partitions: Seq[CatalogTablePartition], oldTablePath: String, newTablePath: String, - sparkSession: SparkSession): Seq[CatalogTablePartition] = { + sparkSession: SparkSession, + newTableName: String, + dbName: String): Seq[CatalogTablePartition] = { partitions.map{ part => if (part.storage.locationUri.isDefined) { val path = new Path(part.location) if (path.toString.contains(oldTablePath)) { val newPath = new Path(path.toString.replace(oldTablePath, newTablePath)) part.copy(storage = sparkSession.sessionState.catalog. - asInstanceOf[CarbonSessionCatalog].updateStorageLocation(newPath, part.storage)) + asInstanceOf[CarbonSessionCatalog].updateStorageLocation( + newPath, + part.storage, + newTableName, + dbName)) } else { part } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala index 51c0e6e..ffd69df 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.schema import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.spark.util.AlterTableUtil private[sql] case class CarbonAlterTableSetCommand( @@ -38,8 +37,7 @@ private[sql] case class CarbonAlterTableSetCommand( tableIdentifier, properties, Nil, - set = true)(sparkSession, - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) + set = true)(sparkSession) Seq.empty } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala index 2490f9e..d5bdd80 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.schema import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.spark.util.AlterTableUtil @@ -37,8 +36,7 @@ private[sql] case class CarbonAlterTableUnsetCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { AlterTableUtil.modifyTableProperties(tableIdentifier, Map.empty[String, String], - propKeys, false)(sparkSession, - sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) + propKeys, false)(sparkSession) Seq.empty } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 96ef473..76aa73e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -18,8 +18,7 @@ package org.apache.spark.sql.hive import scala.collection.JavaConverters._ -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{CarbonSession, SparkSession} +import org.apache.spark.sql.{SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree @@ -168,10 +167,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { val dbName = newTableIdentifier.getDatabaseName val tableName = newTableIdentifier.getTableName val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") - val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] - .getClient() - hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") - + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] + .alterTable(TableIdentifier(tableName, Some(dbName)), schemaParts, None) sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) removeTableFromMetadata(dbName, tableName) CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala index 1c69309..f00739e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala @@ -63,5 +63,59 @@ trait CarbonSessionCatalog { /** * Update the storageformat with new location information */ - def updateStorageLocation(path: Path, storage: CatalogStorageFormat): CatalogStorageFormat + def updateStorageLocation( + path: Path, + storage: CatalogStorageFormat, + newTableName: String, + dbName: String): CatalogStorageFormat + + /** + * Method used to update the table name + * @param oldTableIdentifier old table identifier + * @param newTableIdentifier new table identifier + * @param newTablePath new table path + */ + def alterTableRename(oldTableIdentifier: TableIdentifier, + newTableIdentifier: TableIdentifier, + newTablePath: String): Unit + + /** + * Below method will be used to update serd properties + * @param tableIdentifier table identifier + * @param schemaParts schema parts + * @param cols cols + */ + def alterTable(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit + + /** + * Below method will be used to add new column + * @param tableIdentifier table identifier + * @param schemaParts schema parts + * @param cols cols + */ + def alterAddColumns(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit + + /** + * Below method will be used to drop column + * @param tableIdentifier table identifier + * @param schemaParts schema parts + * @param cols cols + */ + def alterDropColumns(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit + + /** + * Below method will be used to alter data type of column in schema + * @param tableIdentifier table identifier + * @param schemaParts schema parts + * @param cols cols + */ + def alterColumnChangeDataType(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]): Unit } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 1c6756b..ac2bf9f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ListBuffer -import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -36,9 +35,7 @@ import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonTablePath -import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} object AlterTableUtil { @@ -127,11 +124,16 @@ object AlterTableUtil { * @param schemaEvolutionEntry * @param thriftTable * @param sparkSession - * @param catalog */ def updateSchemaInfo(carbonTable: CarbonTable, schemaEvolutionEntry: SchemaEvolutionEntry, - thriftTable: TableInfo)(sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = { + thriftTable: TableInfo, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]] = + None) + (sparkSession: SparkSession): + (TableIdentifier, + String, + Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) = { val dbName = carbonTable.getDatabaseName val tableName = carbonTable.getTableName CarbonEnv.getInstance(sparkSession).carbonMetastore @@ -145,9 +147,7 @@ object AlterTableUtil { val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(tableIdentifier)(sparkSession).schema.json val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema) - val hiveClient = catalog.getClient(); - hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)") - sparkSession.catalog.refreshTable(tableIdentifier.quotedString) + (tableIdentifier, schemaParts, cols) } /** @@ -306,11 +306,10 @@ object AlterTableUtil { * @param propKeys * @param set * @param sparkSession - * @param catalog */ def modifyTableProperties(tableIdentifier: TableIdentifier, properties: Map[String, String], propKeys: Seq[String], set: Boolean) - (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = { + (sparkSession: SparkSession): Unit = { val tableName = tableIdentifier.table val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) LOGGER.audit(s"Alter table properties request has been received for $dbName.$tableName") @@ -360,10 +359,12 @@ object AlterTableUtil { } } } - - updateSchemaInfo(carbonTable, + val (tableIdentifier, schemParts, cols) = updateSchemaInfo(carbonTable, schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), - thriftTable)(sparkSession, catalog) + thriftTable)(sparkSession) + sparkSession.asInstanceOf[CarbonSession].sessionState.catalog + .asInstanceOf[CarbonSessionCatalog].alterTable(tableIdentifier, schemParts, cols) + sparkSession.catalog.refreshTable(tableIdentifier.quotedString) LOGGER.info(s"Alter table properties is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table properties is successful for table $dbName.$tableName") } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala index c37e6fa..9fe7241 100644 --- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -72,7 +72,7 @@ class CarbonHiveSessionCatalog( conf, hadoopConf) with CarbonSessionCatalog { - lazy val carbonEnv = { + private lazy val carbonEnv = { val env = new CarbonEnv env.init(sparkSession) env @@ -84,6 +84,50 @@ class CarbonHiveSessionCatalog( override def getCarbonEnv() : CarbonEnv = { carbonEnv } + + def alterTableRename(oldTableIdentifier: TableIdentifier, + newTableIdentifier: TableIdentifier, + newTablePath: String): Unit = { + getClient().runSqlHive( + s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table }" + + s" RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }") + getClient().runSqlHive( + s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }" + + s" SET SERDEPROPERTIES" + + s"('tableName'='${ newTableIdentifier.table }', " + + s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')") + } + + def alterTable(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + getClient() + .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${tableIdentifier.table } " + + s"SET TBLPROPERTIES(${ schemaParts })") + } + + def alterAddColumns(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + alterTable(tableIdentifier, schemaParts, cols) + } + + def alterDropColumns(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + alterTable(tableIdentifier, schemaParts, cols) + } + + def alterColumnChangeDataType(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + alterTable(tableIdentifier, schemaParts, cols) + } + // Initialize all listeners to the Operation bus. CarbonEnv.init(sparkSession) @@ -195,7 +239,9 @@ class CarbonHiveSessionCatalog( */ override def updateStorageLocation( path: Path, - storage: CatalogStorageFormat): CatalogStorageFormat = { + storage: CatalogStorageFormat, + newTableName: String, + dbName: String): CatalogStorageFormat = { storage.copy(locationUri = Some(path.toString)) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala new file mode 100644 index 0000000..88beb68 --- /dev/null +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonAnalyzer.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.internal.SQLConf + +class CarbonAnalyzer(catalog: SessionCatalog, + conf: SQLConf, + sparkSession: SparkSession, + analyzer: Analyzer) extends Analyzer(catalog, conf) { + override def execute(plan: LogicalPlan): LogicalPlan = { + var logicalPlan = analyzer.execute(plan) + logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan) + CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala new file mode 100644 index 0000000..e8ab84a --- /dev/null +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} +import org.apache.spark.sql.catalyst.catalog._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.optimizer.Optimizer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} +import org.apache.spark.sql.internal.{SQLConf, SessionResourceLoader, SessionState, SessionStateBuilder} +import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} +import org.apache.spark.sql.parser.CarbonSparkSqlParser +import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.{CarbonEnv, SparkSession} + +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.spark.util.CarbonScalaUtil + +/** + * This class will have carbon catalog and refresh the relation from cache if the carbontable in + * carbon catalog is not same as cached carbon relation's carbon table + * + * @param externalCatalog + * @param globalTempViewManager + * @param sparkSession + * @param functionResourceLoader + * @param functionRegistry + * @param conf + * @param hadoopConf + */ +class InMemorySessionCatalog( + externalCatalog: ExternalCatalog, + globalTempViewManager: GlobalTempViewManager, + functionRegistry: FunctionRegistry, + sparkSession: SparkSession, + conf: SQLConf, + hadoopConf: Configuration, + parser: ParserInterface, + functionResourceLoader: FunctionResourceLoader) + extends SessionCatalog( + externalCatalog, + globalTempViewManager, + functionRegistry, + conf, + hadoopConf, + parser, + functionResourceLoader + ) with CarbonSessionCatalog { + + override def alterTableRename(oldTableIdentifier: TableIdentifier, + newTableIdentifier: TableIdentifier, + newTablePath: String): Unit = { + sparkSession.sessionState.catalog.renameTable(oldTableIdentifier, newTableIdentifier) + } + + override def alterTable(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + // NOt Required in case of In-memory catalog + } + + override def alterAddColumns(tableIdentifier: TableIdentifier, + schemaParts: String, + newColumns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) + val structType = catalogTable.schema + var newStructType = structType + newColumns.get.foreach {cols => + newStructType = structType + .add(cols.getColumnName, CarbonScalaUtil.convertCarbonToSparkDataType(cols.getDataType)) + } + alterSchema(newStructType, catalogTable, tableIdentifier) + } + + override def alterDropColumns(tableIdentifier: TableIdentifier, + schemaParts: String, + dropCols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) + val fields = catalogTable.schema.fields.filterNot { field => + dropCols.get.exists { col => + col.getColumnName.equalsIgnoreCase(field.name) + } + } + alterSchema(new StructType(fields), catalogTable, tableIdentifier) + } + + override def alterColumnChangeDataType(tableIdentifier: TableIdentifier, + schemaParts: String, + columns: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableIdentifier) + val a = catalogTable.schema.fields.flatMap { field => + columns.get.map { col => + if (col.getColumnName.equalsIgnoreCase(field.name)) { + StructField(col.getColumnName, + CarbonScalaUtil.convertCarbonToSparkDataType(col.getDataType)) + } else { + field + } + } + } + alterSchema(new StructType(a), catalogTable, tableIdentifier) + } + + private def alterSchema(structType: StructType, + catalogTable: CatalogTable, + tableIdentifier: TableIdentifier): Unit = { + val copy = catalogTable.copy(schema = structType) + sparkSession.sessionState.catalog.alterTable(copy) + sparkSession.sessionState.catalog.refreshTable(tableIdentifier) + } + + lazy val carbonEnv = { + val env = new CarbonEnv + env.init(sparkSession) + env + } + + def getCarbonEnv() : CarbonEnv = { + carbonEnv + } + + // Initialize all listeners to the Operation bus. + CarbonEnv.initListeners() + + def getThriftTableInfo(tablePath: String): TableInfo = { + val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath) + CarbonUtil.readSchemaFile(tableMetadataFile) + } + + override def lookupRelation(name: TableIdentifier): LogicalPlan = { + val rtnRelation = super.lookupRelation(name) + val isRelationRefreshed = + CarbonSessionUtil.refreshRelation(rtnRelation, name)(sparkSession) + if (isRelationRefreshed) { + super.lookupRelation(name) + } else { + rtnRelation + } + } + + /** + * returns hive client from HiveExternalCatalog + * + * @return + */ + def getClient(): org.apache.spark.sql.hive.client.HiveClient = { + null + } + + override def createPartitions( + tableName: TableIdentifier, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = { + try { + val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + val updatedParts = CarbonScalaUtil.updatePartitions(parts, table) + super.createPartitions(tableName, updatedParts, ignoreIfExists) + } catch { + case e: Exception => + super.createPartitions(tableName, parts, ignoreIfExists) + } + } + + /** + * This is alternate way of getting partition information. It first fetches all partitions from + * hive and then apply filter instead of querying hive along with filters. + * @param partitionFilters + * @param sparkSession + * @param identifier + * @return + */ + override def getPartitionsAlternate(partitionFilters: Seq[Expression], + sparkSession: SparkSession, + identifier: TableIdentifier) = { + CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier) + } + + /** + * Update the storageformat with new location information + */ + override def updateStorageLocation( + path: Path, + storage: CatalogStorageFormat, + newTableName: String, + dbName: String): CatalogStorageFormat = { + storage.copy(locationUri = Some(path.toUri)) + } +} + +class CarbonInMemorySessionStateBuilder (sparkSession: SparkSession, + parentState: Option[SessionState] = None) + extends SessionStateBuilder(sparkSession, parentState) { + + override lazy val sqlParser: ParserInterface = new CarbonSparkSqlParser(conf, sparkSession) + + experimentalMethods.extraStrategies = + Seq(new StreamingTableStrategy(sparkSession), + new CarbonLateDecodeStrategy, + new DDLStrategy(sparkSession) + ) + experimentalMethods.extraOptimizations = Seq(new CarbonIUDRule, + new CarbonUDFTransformRule, + new CarbonLateDecodeRule) + + /** + * Internal catalog for managing table and database states. + */ + override protected lazy val catalog: InMemorySessionCatalog = { + val catalog = new InMemorySessionCatalog( + externalCatalog, + session.sharedState.globalTempViewManager, + functionRegistry, + sparkSession, + conf, + SessionState.newHadoopConf(session.sparkContext.hadoopConfiguration, conf), + sqlParser, + resourceLoader) + parentState.foreach(_.catalog.copyStateTo(catalog)) + catalog + } + + private def externalCatalog: ExternalCatalog = + session.sharedState.externalCatalog.asInstanceOf[ExternalCatalog] + + override protected lazy val resourceLoader: SessionResourceLoader = { + new SessionResourceLoader(session) + } + + override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods) + + override protected def analyzer: Analyzer = new CarbonAnalyzer(catalog, conf, sparkSession, + new Analyzer(catalog, conf) { + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = + new FindDataSourceTable(session) +: + new ResolveSQLOnFile(session) +: + new CarbonIUDAnalysisRule(sparkSession) +: + new CarbonPreInsertionCasts(sparkSession) +: customResolutionRules + override val extendedCheckRules: Seq[LogicalPlan => Unit] = + PreWriteCheck :: HiveOnlyCheck :: Nil + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = + PreprocessTableCreation(session) +: + PreprocessTableInsertion(conf) +: + DataSourceAnalysis(conf) +: + customPostHocResolutionRules + } + ) + override protected def newBuilder: NewBuilder = new CarbonInMemorySessionStateBuilder(_, _) +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala new file mode 100644 index 0000000..a046763 --- /dev/null +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonOptimizer.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, ExperimentalMethods} +import org.apache.spark.sql.catalyst.catalog.SessionCatalog +import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan} +import org.apache.spark.sql.execution.SparkOptimizer +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf + + +class CarbonOptimizer( + catalog: SessionCatalog, + conf: SQLConf, + experimentalMethods: ExperimentalMethods) + extends SparkOptimizer(catalog, conf, experimentalMethods) { + + override def execute(plan: LogicalPlan): LogicalPlan = { + val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan) + super.execute(transFormedPlan) + } +} + +object CarbonOptimizerUtil { + def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = { + // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And + // optimize whole plan at once. + val transFormedPlan = plan.transform { + case filter: Filter => + filter.transformExpressions { + case s: ScalarSubquery => + val tPlan = s.plan.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + ScalarSubquery(tPlan, s.children, s.exprId) + case e: Exists => + val tPlan = e.plan.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + Exists(tPlan, e.children.map(_.canonicalized), e.exprId) + + case In(value, Seq(l@ListQuery(sub, _, exprId))) => + val tPlan = sub.transform { + case lr: LogicalRelation + if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => + lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true + lr + } + In(value, Seq(ListQuery(tPlan, l.children, exprId))) + } + } + transFormedPlan + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala index 479c9ce..de37a35 100644 --- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -22,29 +22,18 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin} -import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext} -import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand} -import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand -import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, LogicalRelation, PreWriteCheck, ResolveSQLOnFile, _} +import org.apache.spark.sql.execution.datasources.{FindDataSourceTable, PreWriteCheck, ResolveSQLOnFile, _} import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy} -import org.apache.spark.sql.execution.{SparkOptimizer, SparkSqlAstBuilder} import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.{SQLConf, SessionState} import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonLateDecodeRule, CarbonUDFTransformRule} -import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser, CarbonSparkSqlParser} -import org.apache.spark.sql.types.DecimalType -import org.apache.spark.util.CarbonReflectionUtils +import org.apache.spark.sql.parser.CarbonSparkSqlParser -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.util.CarbonScalaUtil /** @@ -79,7 +68,7 @@ class CarbonHiveSessionCatalog( functionResourceLoader ) with CarbonSessionCatalog { - lazy val carbonEnv = { + private lazy val carbonEnv = { val env = new CarbonEnv env.init(sparkSession) env @@ -95,9 +84,6 @@ class CarbonHiveSessionCatalog( // Initialize all listeners to the Operation bus. CarbonEnv.initListeners() - - - override def lookupRelation(name: TableIdentifier): LogicalPlan = { val rtnRelation = super.lookupRelation(name) val isRelationRefreshed = @@ -119,6 +105,49 @@ class CarbonHiveSessionCatalog( .asInstanceOf[HiveExternalCatalog].client } + def alterTableRename(oldTableIdentifier: TableIdentifier, + newTableIdentifier: TableIdentifier, + newTablePath: String): Unit = { + getClient().runSqlHive( + s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ oldTableIdentifier.table } " + + s"RENAME TO ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table }") + getClient().runSqlHive( + s"ALTER TABLE ${ oldTableIdentifier.database.get }.${ newTableIdentifier.table} " + + s"SET SERDEPROPERTIES" + + s"('tableName'='${ newTableIdentifier.table }', " + + s"'dbName'='${ oldTableIdentifier.database.get }', 'tablePath'='${ newTablePath }')") + } + + override def alterTable(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + getClient() + .runSqlHive(s"ALTER TABLE ${tableIdentifier.database.get}.${ tableIdentifier.table } " + + s"SET TBLPROPERTIES(${ schemaParts })") + } + + override def alterAddColumns(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + alterTable(tableIdentifier, schemaParts, cols) + } + + override def alterDropColumns(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + alterTable(tableIdentifier, schemaParts, cols) + } + + override def alterColumnChangeDataType(tableIdentifier: TableIdentifier, + schemaParts: String, + cols: Option[Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]]) + : Unit = { + alterTable(tableIdentifier, schemaParts, cols) + } + override def createPartitions( tableName: TableIdentifier, parts: Seq[CatalogTablePartition], @@ -152,24 +181,13 @@ class CarbonHiveSessionCatalog( */ override def updateStorageLocation( path: Path, - storage: CatalogStorageFormat): CatalogStorageFormat = { + storage: CatalogStorageFormat, + newTableName: String, + dbName: String): CatalogStorageFormat = { storage.copy(locationUri = Some(path.toUri)) } } - -class CarbonAnalyzer(catalog: SessionCatalog, - conf: SQLConf, - sparkSession: SparkSession, - analyzer: Analyzer) extends Analyzer(catalog, conf) { - override def execute(plan: LogicalPlan): LogicalPlan = { - var logicalPlan = analyzer.execute(plan) - logicalPlan = CarbonPreAggregateDataLoadingRules(sparkSession).apply(logicalPlan) - CarbonPreAggregateQueryRules(sparkSession).apply(logicalPlan) - } -} - - /** * Session state implementation to override sql parser and adding strategies * @@ -194,7 +212,7 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession, * Internal catalog for managing table and database states. */ /** - * Create a [[CarbonSessionCatalogBuild]]. + * Create a [[CarbonSessionStateBuilder]]. */ override protected lazy val catalog: CarbonHiveSessionCatalog = { val catalog = new CarbonHiveSessionCatalog( @@ -248,147 +266,4 @@ class CarbonSessionStateBuilder(sparkSession: SparkSession, ) override protected def newBuilder: NewBuilder = new CarbonSessionStateBuilder(_, _) - -} - - -class CarbonOptimizer( - catalog: SessionCatalog, - conf: SQLConf, - experimentalMethods: ExperimentalMethods) - extends SparkOptimizer(catalog, conf, experimentalMethods) { - - override def execute(plan: LogicalPlan): LogicalPlan = { - val transFormedPlan: LogicalPlan = CarbonOptimizerUtil.transformForScalarSubQuery(plan) - super.execute(transFormedPlan) - } -} - -object CarbonOptimizerUtil { - def transformForScalarSubQuery(plan: LogicalPlan): LogicalPlan = { - // In case scalar subquery add flag in relation to skip the decoder plan in optimizer rule, And - // optimize whole plan at once. - val transFormedPlan = plan.transform { - case filter: Filter => - filter.transformExpressions { - case s: ScalarSubquery => - val tPlan = s.plan.transform { - case lr: LogicalRelation - if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true - lr - } - ScalarSubquery(tPlan, s.children, s.exprId) - case e: Exists => - val tPlan = e.plan.transform { - case lr: LogicalRelation - if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true - lr - } - Exists(tPlan, e.children.map(_.canonicalized), e.exprId) - - case In(value, Seq(l@ListQuery(sub, _, exprId))) => - val tPlan = sub.transform { - case lr: LogicalRelation - if lr.relation.isInstanceOf[CarbonDatasourceHadoopRelation] => - lr.relation.asInstanceOf[CarbonDatasourceHadoopRelation].isSubquery += true - lr - } - In(value, Seq(ListQuery(tPlan, l.children, exprId))) - } - } - transFormedPlan - } -} - -class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) - extends SparkSqlAstBuilder(conf) { - - val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) - - override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { - val fileStorage = helper.getFileStorage(ctx.createFileFormat) - - if (fileStorage.equalsIgnoreCase("'carbondata'") || - fileStorage.equalsIgnoreCase("carbondata") || - fileStorage.equalsIgnoreCase("'carbonfile'") || - fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { - val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, - ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(), - Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage) - helper.createCarbonTable(createTableTuple) - } else { - super.visitCreateHiveTable(ctx) - } - } - - override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = { - - val newColumn = visitColType(ctx.colType) - if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) { - throw new MalformedCarbonCommandException( - "Column names provided are different. Both the column names should be same") - } - - val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match { - case d:DecimalType => ("decimal", Some(List((d.precision, d.scale)))) - case _ => (newColumn.dataType.typeName.toLowerCase, None) - } - - val alterTableChangeDataTypeModel = - AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values), - new CarbonSpark2SqlParser() - .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)), - ctx.tableIdentifier().table.getText.toLowerCase, - ctx.identifier.getText.toLowerCase, - newColumn.name.toLowerCase) - - CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) - } - - - override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = { - - val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList) - val fields = parser.getFields(cols) - val tblProperties = scala.collection.mutable.Map.empty[String, String] - val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false, - new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db) - .map(_.getText)), - ctx.tableIdentifier.table.getText.toLowerCase, - fields, - Seq.empty, - tblProperties, - None, - true) - - val alterTableAddColumnsModel = AlterTableAddColumnsModel( - Option(ctx.tableIdentifier().db).map(_.getText), - ctx.tableIdentifier.table.getText, - tblProperties.toMap, - tableModel.dimCols, - tableModel.msrCols, - tableModel.highcardinalitydims.getOrElse(Seq.empty)) - - CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) - } - - override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { - super.visitCreateTable(ctx) - } - - override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = { - withOrigin(ctx) { - if (CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS, - CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) { - super.visitShowTables(ctx) - } else { - CarbonShowTablesCommand( - Option(ctx.db).map(_.getText), - Option(ctx.pattern).map(string)) - } - } - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/carbondata/blob/e6d03d11/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala new file mode 100644 index 0000000..b0702ae --- /dev/null +++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSqlAstBuilder.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.parser.ParserUtils.{string, withOrigin} +import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{AddTableColumnsContext, ChangeColumnContext, CreateHiveTableContext, CreateTableContext, ShowTablesContext} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.SparkSqlAstBuilder +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel} +import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand} +import org.apache.spark.sql.execution.command.table.CarbonShowTablesCommand +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlParser} +import org.apache.spark.sql.types.DecimalType + +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSession: SparkSession) + extends SparkSqlAstBuilder(conf) { + + val helper = new CarbonHelperSqlAstBuilder(conf, parser, sparkSession) + + override def visitCreateHiveTable(ctx: CreateHiveTableContext): LogicalPlan = { + val fileStorage = helper.getFileStorage(ctx.createFileFormat) + + if (fileStorage.equalsIgnoreCase("'carbondata'") || + fileStorage.equalsIgnoreCase("carbondata") || + fileStorage.equalsIgnoreCase("'carbonfile'") || + fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) { + val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, + ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(), + Option(ctx.STRING()).map(string), ctx.AS, ctx.query, fileStorage) + helper.createCarbonTable(createTableTuple) + } else { + super.visitCreateHiveTable(ctx) + } + } + + override def visitChangeColumn(ctx: ChangeColumnContext): LogicalPlan = { + + val newColumn = visitColType(ctx.colType) + if (!ctx.identifier.getText.equalsIgnoreCase(newColumn.name)) { + throw new MalformedCarbonCommandException( + "Column names provided are different. Both the column names should be same") + } + + val (typeString, values) : (String, Option[List[(Int, Int)]]) = newColumn.dataType match { + case d:DecimalType => ("decimal", Some(List((d.precision, d.scale)))) + case _ => (newColumn.dataType.typeName.toLowerCase, None) + } + + val alterTableChangeDataTypeModel = + AlterTableDataTypeChangeModel(new CarbonSpark2SqlParser().parseDataType(typeString, values), + new CarbonSpark2SqlParser() + .convertDbNameToLowerCase(Option(ctx.tableIdentifier().db).map(_.getText)), + ctx.tableIdentifier().table.getText.toLowerCase, + ctx.identifier.getText.toLowerCase, + newColumn.name.toLowerCase) + + CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) + } + + + override def visitAddTableColumns(ctx: AddTableColumnsContext): LogicalPlan = { + val cols = Option(ctx.columns).toSeq.flatMap(visitColTypeList) + val fields = parser.getFields(cols) + val tblProperties = scala.collection.mutable.Map.empty[String, String] + val tableModel = new CarbonSpark2SqlParser().prepareTableModel (false, + new CarbonSpark2SqlParser().convertDbNameToLowerCase(Option(ctx.tableIdentifier().db) + .map(_.getText)), + ctx.tableIdentifier.table.getText.toLowerCase, + fields, + Seq.empty, + tblProperties, + None, + true) + + val alterTableAddColumnsModel = AlterTableAddColumnsModel( + Option(ctx.tableIdentifier().db).map(_.getText), + ctx.tableIdentifier.table.getText, + tblProperties.toMap, + tableModel.dimCols, + tableModel.msrCols, + tableModel.highcardinalitydims.getOrElse(Seq.empty)) + + CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) + } + + override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = { + super.visitCreateTable(ctx) + } + + override def visitShowTables(ctx: ShowTablesContext): LogicalPlan = { + withOrigin(ctx) { + if (CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_SHOW_DATAMAPS, + CarbonCommonConstants.CARBON_SHOW_DATAMAPS_DEFAULT).toBoolean) { + super.visitShowTables(ctx) + } else { + CarbonShowTablesCommand( + Option(ctx.db).map(_.getText), + Option(ctx.pattern).map(string)) + } + } + } +}