[CARBONDATA-1656][Streaming] Reject alter table command for streaming table
This closes #1448 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/87892522 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/87892522 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/87892522 Branch: refs/heads/pre-aggregate Commit: 87892522bfa73a4876e5cfe68bbf9d460a9a1f52 Parents: 4c41f86 Author: Jacky Li <jacky.li...@qq.com> Authored: Wed Nov 8 10:37:04 2017 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Wed Nov 8 16:04:04 2017 +0800 ---------------------------------------------------------------------- .../schema/AlterTableAddColumnCommand.scala | 115 ------------ .../AlterTableDataTypeChangeCommand.scala | 116 ------------- .../schema/AlterTableDropColumnCommand.scala | 148 ---------------- .../schema/AlterTableRenameTableCommand.scala | 174 ------------------- .../CarbonAlterTableAddColumnCommand.scala | 115 ++++++++++++ .../CarbonAlterTableDataTypeChangeCommand.scala | 116 +++++++++++++ .../CarbonAlterTableDropColumnCommand.scala | 148 ++++++++++++++++ .../schema/CarbonAlterTableRenameCommand.scala | 174 +++++++++++++++++++ .../sql/execution/strategy/DDLStrategy.scala | 10 +- .../strategy/StreamingTableStrategy.scala | 39 ++++- .../sql/parser/CarbonSpark2SqlParser.scala | 8 +- .../TestStreamingTableOperation.scala | 15 ++ .../restructure/AlterTableRevertTestCase.scala | 2 +- 13 files changed, 610 insertions(+), 570 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala deleted file mode 100644 index 6e6a4b1..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableAddColumnCommand.scala +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command.schema - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand} -import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} -import org.apache.spark.util.AlterTableUtil - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.format.TableInfo -import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD} - -private[sql] case class AlterTableAddColumnCommand( - alterTableAddColumnsModel: AlterTableAddColumnsModel) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val tableName = alterTableAddColumnsModel.tableName - val dbName = alterTableAddColumnsModel.databaseName - .getOrElse(sparkSession.catalog.currentDatabase) - LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName") - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) - var locks = List.empty[ICarbonLock] - var timeStamp = 0L - var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() - var carbonTable: CarbonTable = null - try { - locks = AlterTableUtil - .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) - // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st - // operation is success and updates the schema file. 2nd operation will get the lock after - // completion of 1st operation but as look up relation is called before it will have the - // older carbon table and this can lead to inconsistent state in the system. Therefor look - // up relation should be called after acquiring the lock - val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - carbonTable = metastore - .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - .tableMeta.carbonTable - // get the latest carbon table and check for column existence - // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) - val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) - val schemaConverter = new ThriftWrapperSchemaConverterImpl() - val wrapperTableInfo = schemaConverter - .fromExternalToWrapperTableInfo(thriftTableInfo, - dbName, - tableName, - carbonTable.getStorePath) - newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel, - dbName, - wrapperTableInfo, - carbonTablePath, - carbonTable.getCarbonTableIdentifier, - carbonTable.getStorePath, sparkSession.sparkContext).process - // generate dictionary files for the newly added columns - new AlterTableAddColumnRDD(sparkSession.sparkContext, - newCols, - carbonTable.getCarbonTableIdentifier, - carbonTable.getStorePath).collect() - timeStamp = System.currentTimeMillis - val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry - schemaEvolutionEntry.setTimeStamp(timeStamp) - schemaEvolutionEntry.setAdded(newCols.toList.asJava) - val thriftTable = schemaConverter - .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) - AlterTableUtil - .updateSchemaInfo(carbonTable, - schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), - thriftTable)(sparkSession, - sparkSession.sessionState.asInstanceOf[CarbonSessionState]) - LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName") - LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName") - } catch { - case e: Exception => - LOGGER.error(e, "Alter table add columns failed") - if (newCols.nonEmpty) { - LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") - new AlterTableDropColumnRDD(sparkSession.sparkContext, - newCols, - carbonTable.getCarbonTableIdentifier, - carbonTable.getStorePath).collect() - AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession) - } - sys.error(s"Alter table add operation failed: ${e.getMessage}") - } finally { - // release lock after command execution completion - AlterTableUtil.releaseLocks(locks) - } - Seq.empty - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala deleted file mode 100644 index be87bbb..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDataTypeChangeCommand.scala +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command.schema - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand} -import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} -import org.apache.spark.util.AlterTableUtil - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil} - -private[sql] case class AlterTableDataTypeChangeCommand( - alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val tableName = alterTableDataTypeChangeModel.tableName - val dbName = alterTableDataTypeChangeModel.databaseName - .getOrElse(sparkSession.catalog.currentDatabase) - LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName") - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) - var locks = List.empty[ICarbonLock] - // get the latest carbon table and check for column existence - var carbonTable: CarbonTable = null - var timeStamp = 0L - try { - locks = AlterTableUtil - .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) - val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - carbonTable = metastore - .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - .tableMeta.carbonTable - val columnName = alterTableDataTypeChangeModel.columnName - val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible) - if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) { - LOGGER.audit(s"Alter table change data type request has failed. " + - s"Column $columnName does not exist") - sys.error(s"Column does not exist: $columnName") - } - val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName)) - if (carbonColumn.size == 1) { - CarbonScalaUtil - .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head) - } else { - LOGGER.audit(s"Alter table change data type request has failed. " + - s"Column $columnName is invalid") - sys.error(s"Invalid Column: $columnName") - } - // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) - val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) - // maintain the added column for schema evolution history - var addColumnSchema: ColumnSchema = null - var deletedColumnSchema: ColumnSchema = null - val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible) - columnSchemaList.foreach { columnSchema => - if (columnSchema.column_name.equalsIgnoreCase(columnName)) { - deletedColumnSchema = columnSchema.deepCopy - columnSchema.setData_type(DataTypeConverterUtil - .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType)) - columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision) - columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale) - addColumnSchema = columnSchema - } - } - timeStamp = System.currentTimeMillis - val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp) - schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava) - schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava) - tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) - .setTime_stamp(System.currentTimeMillis) - AlterTableUtil - .updateSchemaInfo(carbonTable, - schemaEvolutionEntry, - tableInfo)(sparkSession, - sparkSession.sessionState.asInstanceOf[CarbonSessionState]) - LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName") - LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName") - } catch { - case e: Exception => LOGGER - .error("Alter table change datatype failed : " + e.getMessage) - if (carbonTable != null) { - AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession) - } - sys.error(s"Alter table data type change operation failed: ${e.getMessage}") - } finally { - // release lock after command execution completion - AlterTableUtil.releaseLocks(locks) - } - Seq.empty - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala deleted file mode 100644 index 2f1e3d9..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableDropColumnCommand.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command.schema - -import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer - -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand} -import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} -import org.apache.spark.util.AlterTableUtil - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.encoder.Encoding -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.format.SchemaEvolutionEntry -import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD - -private[sql] case class AlterTableDropColumnCommand( - alterTableDropColumnModel: AlterTableDropColumnModel) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val tableName = alterTableDropColumnModel.tableName - val dbName = alterTableDropColumnModel.databaseName - .getOrElse(sparkSession.catalog.currentDatabase) - LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName") - var locks = List.empty[ICarbonLock] - var timeStamp = 0L - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) - // get the latest carbon table and check for column existence - var carbonTable: CarbonTable = null - try { - locks = AlterTableUtil - .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) - val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - carbonTable = metastore - .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] - .tableMeta.carbonTable - val partitionInfo = carbonTable.getPartitionInfo(tableName) - if (partitionInfo != null) { - val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala - .map(_.getColumnName) - // check each column existence in the table - val partitionColumns = alterTableDropColumnModel.columns.filter { - tableColumn => partitionColumnSchemaList.contains(tableColumn) - } - if (partitionColumns.nonEmpty) { - throw new UnsupportedOperationException("Partition columns cannot be dropped: " + - s"$partitionColumns") - } - } - val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala - var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column - .ColumnSchema]() - var keyColumnCountToBeDeleted = 0 - // TODO: if deleted column list includes bucketted column throw an error - alterTableDropColumnModel.columns.foreach { column => - var columnExist = false - tableColumns.foreach { tableColumn => - // column should not be already deleted and should exist in the table - if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) { - if (tableColumn.isDimension) { - keyColumnCountToBeDeleted += 1 - if (tableColumn.hasEncoding(Encoding.DICTIONARY)) { - dictionaryColumns ++= Seq(tableColumn.getColumnSchema) - } - } - columnExist = true - } - } - if (!columnExist) { - sys.error(s"Column $column does not exists in the table $dbName.$tableName") - } - } - // take the total key column count. key column to be deleted should not - // be >= key columns in schema - val totalKeyColumnInSchema = tableColumns.count { - tableColumn => !tableColumn.isInvisible && tableColumn.isDimension - } - if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) { - sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.") - } - // read the latest schema file - val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier) - val tableInfo: org.apache.carbondata.format.TableInfo = - metastore.getThriftTableInfo(carbonTablePath)(sparkSession) - // maintain the deleted columns for schema evolution history - var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]() - val columnSchemaList = tableInfo.fact_table.table_columns.asScala - alterTableDropColumnModel.columns.foreach { column => - columnSchemaList.foreach { columnSchema => - if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) { - deletedColumnSchema += columnSchema.deepCopy - columnSchema.invisible = true - } - } - } - // add deleted columns to schema evolution history and update the schema - timeStamp = System.currentTimeMillis - val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp) - schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava) - AlterTableUtil - .updateSchemaInfo(carbonTable, - schemaEvolutionEntry, - tableInfo)(sparkSession, - sparkSession.sessionState.asInstanceOf[CarbonSessionState]) - // TODO: 1. add check for deletion of index tables - // delete dictionary files for dictionary column and clear dictionary cache from memory - new AlterTableDropColumnRDD(sparkSession.sparkContext, - dictionaryColumns, - carbonTable.getCarbonTableIdentifier, - carbonTable.getStorePath).collect() - LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName") - LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName") - } catch { - case e: Exception => LOGGER - .error("Alter table drop columns failed : " + e.getMessage) - if (carbonTable != null) { - AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession) - } - sys.error(s"Alter table drop column operation failed: ${e.getMessage}") - } finally { - // release lock after command execution completion - AlterTableUtil.releaseLocks(locks) - } - Seq.empty - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala deleted file mode 100644 index af361d5..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableRenameTableCommand.scala +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.command.schema - -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand} -import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} -import org.apache.spark.util.AlterTableUtil - -import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} -import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonStorePath -import org.apache.carbondata.format.SchemaEvolutionEntry -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException - -private[sql] case class AlterTableRenameTableCommand( - alterTableRenameModel: AlterTableRenameModel) - extends RunnableCommand { - - override def run(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier - val newTableIdentifier = alterTableRenameModel.newTableIdentifier - val oldDatabaseName = oldTableIdentifier.database - .getOrElse(sparkSession.catalog.currentDatabase) - val newDatabaseName = newTableIdentifier.database - .getOrElse(sparkSession.catalog.currentDatabase) - if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) { - throw new MalformedCarbonCommandException("Database name should be same for both tables") - } - val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table) - if (tableExists) { - throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " + - s"already exists") - } - val oldTableName = oldTableIdentifier.table.toLowerCase - val newTableName = newTableIdentifier.table.toLowerCase - LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName") - LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName") - val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore - val relation: CarbonRelation = - metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession) - .asInstanceOf[CarbonRelation] - if (relation == null) { - LOGGER.audit(s"Rename table request has failed. " + - s"Table $oldDatabaseName.$oldTableName does not exist") - sys.error(s"Table $oldDatabaseName.$oldTableName does not exist") - } - val locksToBeAcquired = List(LockUsage.METADATA_LOCK, - LockUsage.COMPACTION_LOCK, - LockUsage.DELETE_SEGMENT_LOCK, - LockUsage.CLEAN_FILES_LOCK, - LockUsage.DROP_TABLE_LOCK) - var locks = List.empty[ICarbonLock] - var timeStamp = 0L - var carbonTable: CarbonTable = null - try { - locks = AlterTableUtil - .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)( - sparkSession) - val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) - .asInstanceOf[CarbonRelation].tableMeta - carbonTable = tableMeta.carbonTable - // get the latest carbon table and check for column existence - val carbonTablePath = CarbonStorePath. - getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath)) - val tableMetadataFile = carbonTablePath.getPath - val tableInfo: org.apache.carbondata.format.TableInfo = - metastore.getThriftTableInfo(carbonTablePath)(sparkSession) - val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) - schemaEvolutionEntry.setTableName(newTableName) - timeStamp = System.currentTimeMillis() - schemaEvolutionEntry.setTime_stamp(timeStamp) - renameBadRecords(oldTableName, newTableName, oldDatabaseName) - val fileType = FileFactory.getFileType(tableMetadataFile) - if (FileFactory.isFileExist(tableMetadataFile, fileType)) { - val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType) - .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + - newTableName) - if (!rename) { - renameBadRecords(newTableName, oldTableName, oldDatabaseName) - sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName") - } - } - val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, - newTableName, carbonTable.getCarbonTableIdentifier.getTableId) - val newTablePath = metastore.updateTableSchema(newTableIdentifier, - carbonTable.getCarbonTableIdentifier, - tableInfo, - schemaEvolutionEntry, - tableMeta.tablePath)(sparkSession) - metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) - sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive - .runSqlHive( - s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName") - sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive - .runSqlHive( - s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" + - s"('tableName'='$newTableName', " + - s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')") - sparkSession.catalog.refreshTable(TableIdentifier(newTableName, - Some(oldDatabaseName)).quotedString) - LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName") - LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") - } catch { - case e: Exception => - LOGGER.error(e, "Rename table failed: " + e.getMessage) - if (carbonTable != null) { - AlterTableUtil - .revertRenameTableChanges(oldTableIdentifier, - newTableName, - carbonTable.getStorePath, - carbonTable.getCarbonTableIdentifier.getTableId, - timeStamp)( - sparkSession) - renameBadRecords(newTableName, oldTableName, oldDatabaseName) - } - sys.error(s"Alter table rename table operation failed: ${e.getMessage}") - } finally { - // release lock after command execution completion - AlterTableUtil.releaseLocks(locks) - // case specific to rename table as after table rename old table path will not be found - if (carbonTable != null) { - AlterTableUtil - .releaseLocksManually(locks, - locksToBeAcquired, - oldDatabaseName, - newTableName, - carbonTable.getStorePath) - } - } - Seq.empty - } - - private def renameBadRecords( - oldTableName: String, - newTableName: String, - dataBaseName: String): Unit = { - val oldPath = CarbonUtil - .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName) - val newPath = CarbonUtil - .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName) - val fileType = FileFactory.getFileType(oldPath) - if (FileFactory.isFileExist(oldPath, fileType)) { - val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType) - .renameForce(newPath) - if (!renameSuccess) { - sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName") - } - } - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala new file mode 100644 index 0000000..8737464 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, RunnableCommand} +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.format.TableInfo +import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD} + +private[sql] case class CarbonAlterTableAddColumnCommand( + alterTableAddColumnsModel: AlterTableAddColumnsModel) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = alterTableAddColumnsModel.tableName + val dbName = alterTableAddColumnsModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName") + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + var locks = List.empty[ICarbonLock] + var timeStamp = 0L + var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() + var carbonTable: CarbonTable = null + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + // Consider a concurrent scenario where 2 alter operations are executed in parallel. 1st + // operation is success and updates the schema file. 2nd operation will get the lock after + // completion of 1st operation but as look up relation is called before it will have the + // older carbon table and this can lead to inconsistent state in the system. Therefor look + // up relation should be called after acquiring the lock + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .tableMeta.carbonTable + // get the latest carbon table and check for column existence + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + dbName, + tableName, + carbonTable.getStorePath) + newCols = new AlterTableColumnSchemaGenerator(alterTableAddColumnsModel, + dbName, + wrapperTableInfo, + carbonTablePath, + carbonTable.getCarbonTableIdentifier, + carbonTable.getStorePath, sparkSession.sparkContext).process + // generate dictionary files for the newly added columns + new AlterTableAddColumnRDD(sparkSession.sparkContext, + newCols, + carbonTable.getCarbonTableIdentifier, + carbonTable.getStorePath).collect() + timeStamp = System.currentTimeMillis + val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(timeStamp) + schemaEvolutionEntry.setAdded(newCols.toList.asJava) + val thriftTable = schemaConverter + .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + AlterTableUtil + .updateSchemaInfo(carbonTable, + schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), + thriftTable)(sparkSession, + sparkSession.sessionState.asInstanceOf[CarbonSessionState]) + LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName") + LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName") + } catch { + case e: Exception => + LOGGER.error(e, "Alter table add columns failed") + if (newCols.nonEmpty) { + LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") + new AlterTableDropColumnRDD(sparkSession.sparkContext, + newCols, + carbonTable.getCarbonTableIdentifier, + carbonTable.getStorePath).collect() + AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession) + } + sys.error(s"Alter table add operation failed: ${e.getMessage}") + } finally { + // release lock after command execution completion + AlterTableUtil.releaseLocks(locks) + } + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala new file mode 100644 index 0000000..4e180c8 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, RunnableCommand} +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil} + +private[sql] case class CarbonAlterTableDataTypeChangeCommand( + alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = alterTableDataTypeChangeModel.tableName + val dbName = alterTableDataTypeChangeModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName") + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + var locks = List.empty[ICarbonLock] + // get the latest carbon table and check for column existence + var carbonTable: CarbonTable = null + var timeStamp = 0L + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .tableMeta.carbonTable + val columnName = alterTableDataTypeChangeModel.columnName + val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible) + if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) { + LOGGER.audit(s"Alter table change data type request has failed. " + + s"Column $columnName does not exist") + sys.error(s"Column does not exist: $columnName") + } + val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName)) + if (carbonColumn.size == 1) { + CarbonScalaUtil + .validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head) + } else { + LOGGER.audit(s"Alter table change data type request has failed. " + + s"Column $columnName is invalid") + sys.error(s"Invalid Column: $columnName") + } + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + // maintain the added column for schema evolution history + var addColumnSchema: ColumnSchema = null + var deletedColumnSchema: ColumnSchema = null + val columnSchemaList = tableInfo.fact_table.table_columns.asScala.filter(!_.isInvisible) + columnSchemaList.foreach { columnSchema => + if (columnSchema.column_name.equalsIgnoreCase(columnName)) { + deletedColumnSchema = columnSchema.deepCopy + columnSchema.setData_type(DataTypeConverterUtil + .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType)) + columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision) + columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale) + addColumnSchema = columnSchema + } + } + timeStamp = System.currentTimeMillis + val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp) + schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava) + schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava) + tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) + .setTime_stamp(System.currentTimeMillis) + AlterTableUtil + .updateSchemaInfo(carbonTable, + schemaEvolutionEntry, + tableInfo)(sparkSession, + sparkSession.sessionState.asInstanceOf[CarbonSessionState]) + LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName") + LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName") + } catch { + case e: Exception => LOGGER + .error("Alter table change datatype failed : " + e.getMessage) + if (carbonTable != null) { + AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession) + } + sys.error(s"Alter table data type change operation failed: ${e.getMessage}") + } finally { + // release lock after command execution completion + AlterTableUtil.releaseLocks(locks) + } + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala new file mode 100644 index 0000000..3ac23f7 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, RunnableCommand} +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.format.SchemaEvolutionEntry +import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD + +private[sql] case class CarbonAlterTableDropColumnCommand( + alterTableDropColumnModel: AlterTableDropColumnModel) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val tableName = alterTableDropColumnModel.tableName + val dbName = alterTableDropColumnModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName") + var locks = List.empty[ICarbonLock] + var timeStamp = 0L + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + // get the latest carbon table and check for column existence + var carbonTable: CarbonTable = null + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .tableMeta.carbonTable + val partitionInfo = carbonTable.getPartitionInfo(tableName) + if (partitionInfo != null) { + val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala + .map(_.getColumnName) + // check each column existence in the table + val partitionColumns = alterTableDropColumnModel.columns.filter { + tableColumn => partitionColumnSchemaList.contains(tableColumn) + } + if (partitionColumns.nonEmpty) { + throw new UnsupportedOperationException("Partition columns cannot be dropped: " + + s"$partitionColumns") + } + } + val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala + var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column + .ColumnSchema]() + var keyColumnCountToBeDeleted = 0 + // TODO: if deleted column list includes bucketted column throw an error + alterTableDropColumnModel.columns.foreach { column => + var columnExist = false + tableColumns.foreach { tableColumn => + // column should not be already deleted and should exist in the table + if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) { + if (tableColumn.isDimension) { + keyColumnCountToBeDeleted += 1 + if (tableColumn.hasEncoding(Encoding.DICTIONARY)) { + dictionaryColumns ++= Seq(tableColumn.getColumnSchema) + } + } + columnExist = true + } + } + if (!columnExist) { + sys.error(s"Column $column does not exists in the table $dbName.$tableName") + } + } + // take the total key column count. key column to be deleted should not + // be >= key columns in schema + val totalKeyColumnInSchema = tableColumns.count { + tableColumn => !tableColumn.isInvisible && tableColumn.isDimension + } + if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) { + sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.") + } + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val tableInfo: org.apache.carbondata.format.TableInfo = + metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + // maintain the deleted columns for schema evolution history + var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]() + val columnSchemaList = tableInfo.fact_table.table_columns.asScala + alterTableDropColumnModel.columns.foreach { column => + columnSchemaList.foreach { columnSchema => + if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) { + deletedColumnSchema += columnSchema.deepCopy + columnSchema.invisible = true + } + } + } + // add deleted columns to schema evolution history and update the schema + timeStamp = System.currentTimeMillis + val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp) + schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava) + AlterTableUtil + .updateSchemaInfo(carbonTable, + schemaEvolutionEntry, + tableInfo)(sparkSession, + sparkSession.sessionState.asInstanceOf[CarbonSessionState]) + // TODO: 1. add check for deletion of index tables + // delete dictionary files for dictionary column and clear dictionary cache from memory + new AlterTableDropColumnRDD(sparkSession.sparkContext, + dictionaryColumns, + carbonTable.getCarbonTableIdentifier, + carbonTable.getStorePath).collect() + LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName") + LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName") + } catch { + case e: Exception => LOGGER + .error("Alter table drop columns failed : " + e.getMessage) + if (carbonTable != null) { + AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession) + } + sys.error(s"Alter table drop column operation failed: ${e.getMessage}") + } finally { + // release lock after command execution completion + AlterTableUtil.releaseLocks(locks) + } + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala new file mode 100644 index 0000000..88cf212 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command.{AlterTableRenameModel, RunnableCommand} +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonUtil +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.format.SchemaEvolutionEntry +import org.apache.carbondata.spark.exception.MalformedCarbonCommandException + +private[sql] case class CarbonAlterTableRenameCommand( + alterTableRenameModel: AlterTableRenameModel) + extends RunnableCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val oldTableIdentifier = alterTableRenameModel.oldTableIdentifier + val newTableIdentifier = alterTableRenameModel.newTableIdentifier + val oldDatabaseName = oldTableIdentifier.database + .getOrElse(sparkSession.catalog.currentDatabase) + val newDatabaseName = newTableIdentifier.database + .getOrElse(sparkSession.catalog.currentDatabase) + if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) { + throw new MalformedCarbonCommandException("Database name should be same for both tables") + } + val tableExists = sparkSession.catalog.tableExists(oldDatabaseName, newTableIdentifier.table) + if (tableExists) { + throw new MalformedCarbonCommandException(s"Table with name $newTableIdentifier " + + s"already exists") + } + val oldTableName = oldTableIdentifier.table.toLowerCase + val newTableName = newTableIdentifier.table.toLowerCase + LOGGER.audit(s"Rename table request has been received for $oldDatabaseName.$oldTableName") + LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName") + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + val relation: CarbonRelation = + metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession) + .asInstanceOf[CarbonRelation] + if (relation == null) { + LOGGER.audit(s"Rename table request has failed. " + + s"Table $oldDatabaseName.$oldTableName does not exist") + sys.error(s"Table $oldDatabaseName.$oldTableName does not exist") + } + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.COMPACTION_LOCK, + LockUsage.DELETE_SEGMENT_LOCK, + LockUsage.CLEAN_FILES_LOCK, + LockUsage.DROP_TABLE_LOCK) + var locks = List.empty[ICarbonLock] + var timeStamp = 0L + var carbonTable: CarbonTable = null + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)( + sparkSession) + val tableMeta = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession) + .asInstanceOf[CarbonRelation].tableMeta + carbonTable = tableMeta.carbonTable + // get the latest carbon table and check for column existence + val carbonTablePath = CarbonStorePath. + getCarbonTablePath(AbsoluteTableIdentifier.fromTablePath(tableMeta.tablePath)) + val tableMetadataFile = carbonTablePath.getPath + val tableInfo: org.apache.carbondata.format.TableInfo = + metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) + schemaEvolutionEntry.setTableName(newTableName) + timeStamp = System.currentTimeMillis() + schemaEvolutionEntry.setTime_stamp(timeStamp) + renameBadRecords(oldTableName, newTableName, oldDatabaseName) + val fileType = FileFactory.getFileType(tableMetadataFile) + if (FileFactory.isFileExist(tableMetadataFile, fileType)) { + val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType) + .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR + + newTableName) + if (!rename) { + renameBadRecords(newTableName, oldTableName, oldDatabaseName) + sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName") + } + } + val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName, + newTableName, carbonTable.getCarbonTableIdentifier.getTableId) + val newTablePath = metastore.updateTableSchema(newTableIdentifier, + carbonTable.getCarbonTableIdentifier, + tableInfo, + schemaEvolutionEntry, + tableMeta.tablePath)(sparkSession) + metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) + sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive + .runSqlHive( + s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName") + sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive + .runSqlHive( + s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" + + s"('tableName'='$newTableName', " + + s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')") + sparkSession.catalog.refreshTable(TableIdentifier(newTableName, + Some(oldDatabaseName)).quotedString) + LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName") + LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") + } catch { + case e: Exception => + LOGGER.error(e, "Rename table failed: " + e.getMessage) + if (carbonTable != null) { + AlterTableUtil + .revertRenameTableChanges(oldTableIdentifier, + newTableName, + carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier.getTableId, + timeStamp)( + sparkSession) + renameBadRecords(newTableName, oldTableName, oldDatabaseName) + } + sys.error(s"Alter table rename table operation failed: ${e.getMessage}") + } finally { + // release lock after command execution completion + AlterTableUtil.releaseLocks(locks) + // case specific to rename table as after table rename old table path will not be found + if (carbonTable != null) { + AlterTableUtil + .releaseLocksManually(locks, + locksToBeAcquired, + oldDatabaseName, + newTableName, + carbonTable.getStorePath) + } + } + Seq.empty + } + + private def renameBadRecords( + oldTableName: String, + newTableName: String, + dataBaseName: String): Unit = { + val oldPath = CarbonUtil + .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + oldTableName) + val newPath = CarbonUtil + .getBadLogPath(dataBaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName) + val fileType = FileFactory.getFileType(oldPath) + if (FileFactory.isFileExist(oldPath, fileType)) { + val renameSuccess = FileFactory.getCarbonFile(oldPath, fileType) + .renameForce(newPath) + if (!renameSuccess) { + sys.error(s"BadRecords Folder Rename Failed for table $dataBaseName.$oldTableName") + } + } + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index bdfaa5a..bf13e41 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CarbonShowLoadsCommand, LoadTableByInsertCommand, LoadTableCommand} import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand -import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand, AlterTableRenameTableCommand} +import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand, CarbonAlterTableRenameCommand} import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} import org.apache.carbondata.core.util.CarbonUtil @@ -56,7 +56,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { sparkSession) if (isCarbonTable) { val renameModel = AlterTableRenameModel(tableIdentifier, newTableIdentifier) - ExecutedCommandExec(AlterTableRenameTableCommand(renameModel)) :: Nil + ExecutedCommandExec(CarbonAlterTableRenameCommand(renameModel)) :: Nil } else { ExecutedCommandExec(alter) :: Nil } @@ -98,7 +98,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { throw new MalformedCarbonCommandException( "Operation not allowed : " + altertablemodel.alterSql) } - case dataTypeChange@AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) => + case dataTypeChange@CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) => val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .tableExists(TableIdentifier(alterTableChangeDataTypeModel.tableName, alterTableChangeDataTypeModel.databaseName))(sparkSession) @@ -107,7 +107,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { } else { throw new MalformedCarbonCommandException("Unsupported alter operation on hive table") } - case addColumn@AlterTableAddColumnCommand(alterTableAddColumnsModel) => + case addColumn@CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) => val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .tableExists(TableIdentifier(alterTableAddColumnsModel.tableName, alterTableAddColumnsModel.databaseName))(sparkSession) @@ -116,7 +116,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { } else { throw new MalformedCarbonCommandException("Unsupported alter operation on hive table") } - case dropColumn@AlterTableDropColumnCommand(alterTableDropColumnModel) => + case dropColumn@CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) => val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore .tableExists(TableIdentifier(alterTableDropColumnModel.tableName, alterTableDropColumnModel.databaseName))(sparkSession) http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala index 0f0bc24..9ebf47e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala @@ -21,8 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} -import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.command.{AlterTableRenameCommand, ExecutedCommandExec} import org.apache.spark.sql.execution.command.mutation.{DeleteExecution, ProjectForDeleteCommand, ProjectForUpdateCommand} +import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand} import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -34,12 +35,36 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp override def apply(plan: LogicalPlan): Seq[SparkPlan] = { plan match { - case update@ProjectForUpdateCommand(_, tableIdentifier) => - rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Data update") - ExecutedCommandExec(update) :: Nil - case delete@ProjectForDeleteCommand(_, tableIdentifier, _) => - rejectIfStreamingTable(DeleteExecution.getTableIdentifier(tableIdentifier), "Date delete") - ExecutedCommandExec(delete) :: Nil + case ProjectForUpdateCommand(_, tableIdentifier) => + rejectIfStreamingTable( + DeleteExecution.getTableIdentifier(tableIdentifier), + "Data update") + Nil + case ProjectForDeleteCommand(_, tableIdentifier, _) => + rejectIfStreamingTable( + DeleteExecution.getTableIdentifier(tableIdentifier), + "Date delete") + Nil + case CarbonAlterTableAddColumnCommand(model) => + rejectIfStreamingTable( + new TableIdentifier(model.tableName, model.databaseName), + "Alter table add column") + Nil + case CarbonAlterTableDropColumnCommand(model) => + rejectIfStreamingTable( + new TableIdentifier(model.tableName, model.databaseName), + "Alter table drop column") + Nil + case CarbonAlterTableDataTypeChangeCommand(model) => + rejectIfStreamingTable( + new TableIdentifier(model.tableName, model.databaseName), + "Alter table change datatype") + Nil + case AlterTableRenameCommand(oldTableIdentifier, _, _) => + rejectIfStreamingTable( + oldTableIdentifier, + "Alter rename table") + Nil case _ => Nil } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 9c87b8b..fc2ed41 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CleanFilesCommand, DeleteLoadByIdCommand, DeleteLoadByLoadDateCommand, LoadTableCommand} import org.apache.spark.sql.execution.command.partition.{AlterTableDropCarbonPartitionCommand, AlterTableSplitCarbonPartitionCommand} -import org.apache.spark.sql.execution.command.schema.{AlterTableAddColumnCommand, AlterTableDataTypeChangeCommand, AlterTableDropColumnCommand} +import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand} import org.apache.spark.sql.types.StructField import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -326,7 +326,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { table.toLowerCase, columnName.toLowerCase, columnNameCopy.toLowerCase) - AlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) + CarbonAlterTableDataTypeChangeCommand(alterTableChangeDataTypeModel) } protected lazy val alterTableAddColumns: Parser[LogicalPlan] = @@ -395,7 +395,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { tableModel.dimCols, tableModel.msrCols, tableModel.highcardinalitydims.getOrElse(Seq.empty)) - AlterTableAddColumnCommand(alterTableAddColumnsModel) + CarbonAlterTableAddColumnCommand(alterTableAddColumnsModel) } private def checkFieldDefaultValue(fieldName: String, defaultValueColumnName: String): Boolean = { @@ -419,7 +419,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { val alterTableDropColumnModel = AlterTableDropColumnModel(convertDbNameToLowerCase(dbName), table.toLowerCase, values.map(_.toLowerCase)) - AlterTableDropColumnCommand(alterTableDropColumnModel) + CarbonAlterTableDropColumnCommand(alterTableDropColumnModel) } def getFields(schema: Seq[StructField]): Seq[Field] = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index b733d4f..d5f9426 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -80,6 +80,21 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { } } + test("test blocking alter table operation on streaming table") { + intercept[MalformedCarbonCommandException] { + sql("""ALTER TABLE source ADD COLUMNS (c6 string)""").show() + } + intercept[MalformedCarbonCommandException] { + sql("""ALTER TABLE source DROP COLUMNS (c1)""").show() + } + intercept[MalformedCarbonCommandException] { + sql("""ALTER TABLE source RENAME to t""").show() + } + intercept[MalformedCarbonCommandException] { + sql("""ALTER TABLE source CHANGE c1 c1 int""").show() + } + } + override def afterAll { sql("USE default") sql("DROP DATABASE IF EXISTS streaming CASCADE") http://git-wip-us.apache.org/repos/asf/carbondata/blob/87892522/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala index 29de05b..00170e2 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala @@ -51,7 +51,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { } test("test to revert table name on failure") { - intercept[RuntimeException] { + val exception = intercept[RuntimeException] { new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir() sql("alter table reverttest rename to reverttest_fail") new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete()