Changes done: 1. Support creation and deletion of dictionary files during alter add and drop columns through RDD to parallelize the task and increase the performance 2. Support clean up of dictionary files in case any failure occurs during alter add columns operation
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/b5ba4c6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/b5ba4c6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/b5ba4c6e Branch: refs/heads/12-dev Commit: b5ba4c6ea2d864f099bd4112e2cd5260e615a0a8 Parents: 4a7adfa Author: manishgupta88 <tomanishgupt...@gmail.com> Authored: Tue Apr 4 19:59:18 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Apr 6 10:53:51 2017 +0530 ---------------------------------------------------------------------- .../core/cache/dictionary/ManageDictionary.java | 85 +++++++------- .../spark/rdd/AlterTableAddColumnRDD.scala | 110 +++++++++++++++++++ .../spark/rdd/AlterTableDropColumnRDD.scala | 96 ++++++++++++++++ .../execution/command/carbonTableSchema.scala | 21 ++-- .../execution/command/AlterTableCommands.scala | 48 +++++--- 5 files changed, 286 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java index 706bc20..0a38890 100644 --- a/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java +++ b/core/src/main/java/org/apache/carbondata/core/cache/dictionary/ManageDictionary.java @@ -18,7 +18,6 @@ package org.apache.carbondata.core.cache.dictionary; import java.io.IOException; -import java.util.List; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -30,8 +29,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -51,62 +49,59 @@ public class ManageDictionary { * This method will delete the dictionary files for the given column IDs and * clear the dictionary cache * - * @param dictionaryColumns - * @param carbonTable + * @param columnSchema + * @param carbonTableIdentifier + * @param storePath */ - public static void deleteDictionaryFileAndCache(List<CarbonColumn> dictionaryColumns, - CarbonTable carbonTable) { - if (!dictionaryColumns.isEmpty()) { - CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath(), carbonTableIdentifier); - String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath(); - CarbonFile metadataDir = FileFactory - .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath)); - for (final CarbonColumn column : dictionaryColumns) { - // sort index file is created with dictionary size appended to it. So all the files - // with a given column ID need to be listed - CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile path) { - if (path.getName().startsWith(column.getColumnId())) { - return true; - } - return false; - } - }); - for (CarbonFile file : listFiles) { - // try catch is inside for loop because even if one deletion fails, other files - // still need to be deleted - try { - FileFactory.deleteFile(file.getCanonicalPath(), - FileFactory.getFileType(file.getCanonicalPath())); - } catch (IOException e) { - LOGGER.error( - "Failed to delete dictionary or sortIndex file for column " + column.getColName() - + "with column ID " + column.getColumnId()); - } + public static void deleteDictionaryFileAndCache(final ColumnSchema columnSchema, + CarbonTableIdentifier carbonTableIdentifier, String storePath) { + CarbonTablePath carbonTablePath = + CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier); + String metadataDirectoryPath = carbonTablePath.getMetadataDirectoryPath(); + CarbonFile metadataDir = FileFactory + .getCarbonFile(metadataDirectoryPath, FileFactory.getFileType(metadataDirectoryPath)); + // sort index file is created with dictionary size appended to it. So all the files + // with a given column ID need to be listed + CarbonFile[] listFiles = metadataDir.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile path) { + if (path.getName().startsWith(columnSchema.getColumnUniqueId())) { + return true; } - // remove dictionary cache - removeDictionaryColumnFromCache(carbonTable, column.getColumnId()); + return false; + } + }); + for (CarbonFile file : listFiles) { + // try catch is inside for loop because even if one deletion fails, other files + // still need to be deleted + try { + FileFactory + .deleteFile(file.getCanonicalPath(), FileFactory.getFileType(file.getCanonicalPath())); + } catch (IOException e) { + LOGGER.error("Failed to delete dictionary or sortIndex file for column " + columnSchema + .getColumnName() + "with column ID " + columnSchema.getColumnUniqueId()); } } + // remove dictionary cache + removeDictionaryColumnFromCache(carbonTableIdentifier, storePath, + columnSchema.getColumnUniqueId()); } /** * This method will remove dictionary cache from driver for both reverse and forward dictionary * - * @param carbonTable + * @param carbonTableIdentifier + * @param storePath * @param columnId */ - public static void removeDictionaryColumnFromCache(CarbonTable carbonTable, String columnId) { - Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache = CacheProvider.getInstance() - .createCache(CacheType.REVERSE_DICTIONARY, carbonTable.getStorePath()); + public static void removeDictionaryColumnFromCache(CarbonTableIdentifier carbonTableIdentifier, + String storePath, String columnId) { + Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache = + CacheProvider.getInstance().createCache(CacheType.REVERSE_DICTIONARY, storePath); DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier = - new DictionaryColumnUniqueIdentifier(carbonTable.getCarbonTableIdentifier(), + new DictionaryColumnUniqueIdentifier(carbonTableIdentifier, new ColumnIdentifier(columnId, null, null)); dictCache.invalidate(dictionaryColumnUniqueIdentifier); - dictCache = CacheProvider.getInstance() - .createCache(CacheType.FORWARD_DICTIONARY, carbonTable.getStorePath()); + dictCache = CacheProvider.getInstance().createCache(CacheType.FORWARD_DICTIONARY, storePath); dictCache.invalidate(dictionaryColumnUniqueIdentifier); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala new file mode 100644 index 0000000..bb65b0b --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.command.AlterTableAddColumnsModel + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema +import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.spark.util.GlobalDictionaryUtil + +/** + * This is a partitioner class for dividing the newly added columns into partitions + * + * @param rddId + * @param idx + * @param schema + */ +class AddColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Partition { + override def index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx + + val columnSchema = schema +} + +/** + * This class is aimed at generating dictionary file for the newly added columns + */ +class AlterTableAddColumnRDD[K, V](sc: SparkContext, + @transient newColumns: Seq[ColumnSchema], + alterTableModel: AlterTableAddColumnsModel, + carbonTableIdentifier: CarbonTableIdentifier, + carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) { + + override def getPartitions: Array[Partition] = { + newColumns.zipWithIndex.map { column => + new DropColumnPartition(id, column._2, column._1) + }.toArray + } + + override def compute(split: Partition, + context: TaskContext): Iterator[(Int, String)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS + val iter = new Iterator[(Int, String)] { + try { + val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema + // create dictionary file if it is a dictionary column + if (columnSchema.hasEncoding(Encoding.DICTIONARY) && + !columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + val carbonTablePath = CarbonStorePath + .getCarbonTablePath(carbonStorePath, carbonTableIdentifier) + var rawData: String = null + if (null != columnSchema.getDefaultValue) { + rawData = new String(columnSchema.getDefaultValue, + CarbonCommonConstants.DEFAULT_CHARSET_CLASS) + } + GlobalDictionaryUtil + .loadDefaultDictionaryValueForNewColumn(carbonTablePath, + columnSchema, + carbonTableIdentifier, + carbonStorePath, + rawData) + } + } catch { + case ex: Exception => + throw ex + } + + var finished = false + + override def hasNext: Boolean = { + + if (!finished) { + finished = true + finished + } else { + !finished + } + } + + override def next(): (Int, String) = { + (split.index, status) + } + } + iter + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala new file mode 100644 index 0000000..49dadd3 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.spark.rdd.RDD + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.cache.dictionary.ManageDictionary +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.core.metadata.encoder.Encoding +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema + +/** + * This is a partitioner class for dividing the newly added columns into partitions + * + * @param rddId + * @param idx + * @param schema + */ +class DropColumnPartition(rddId: Int, idx: Int, schema: ColumnSchema) extends Partition { + override def index: Int = idx + + override def hashCode(): Int = 41 * (41 + rddId) + idx + + val columnSchema = schema +} + +/** + * This class is aimed at generating dictionary file for the newly added columns + */ +class AlterTableDropColumnRDD[K, V](sc: SparkContext, + @transient newColumns: Seq[ColumnSchema], + carbonTableIdentifier: CarbonTableIdentifier, + carbonStorePath: String) extends RDD[(Int, String)](sc, Nil) { + + override def getPartitions: Array[Partition] = { + newColumns.zipWithIndex.map { column => + new DropColumnPartition(id, column._2, column._1) + }.toArray + } + + override def compute(split: Partition, + context: TaskContext): Iterator[(Int, String)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS + val iter = new Iterator[(Int, String)] { + try { + val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema + if (columnSchema.hasEncoding(Encoding.DICTIONARY) && + !columnSchema.hasEncoding(Encoding.DIRECT_DICTIONARY)) { + ManageDictionary + .deleteDictionaryFileAndCache(columnSchema, carbonTableIdentifier, carbonStorePath) + } + } catch { + case ex: Exception => + LOGGER.error(ex, ex.getMessage) + throw ex + } + + var finished = false + + override def hasNext: Boolean = { + + if (!finished) { + finished = true + finished + } else { + !finished + } + } + + override def next(): (Int, String) = { + (split.index, status) + } + } + iter + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 6c44264..dadd03e 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -23,6 +23,7 @@ import java.util.UUID import scala.collection.JavaConverters._ import scala.collection.mutable.Map +import org.apache.spark.SparkContext import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.TableIdentifier @@ -42,6 +43,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.spark.CarbonSparkFactory import org.apache.carbondata.spark.load.FailureCauses import org.apache.carbondata.spark.merger.CompactionType +import org.apache.carbondata.spark.rdd.AlterTableAddColumnRDD import org.apache.carbondata.spark.util.{DataTypeConverterUtil, GlobalDictionaryUtil} case class TableModel( @@ -156,7 +158,7 @@ class AlterTableProcessor( tableInfo: TableInfo, carbonTablePath: CarbonTablePath, tableIdentifier: CarbonTableIdentifier, - storePath: String) { + storePath: String, sc: SparkContext) { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -227,7 +229,6 @@ class AlterTableProcessor( tablePropertiesMap.put(x._1, x._2) } } - // This part will create dictionary file for all newly added dictionary columns // if valid default value is provided, // then that value will be included while creating dictionary file @@ -251,17 +252,13 @@ class AlterTableProcessor( } } } - if (col.getEncodingList.contains(Encoding.DICTIONARY) && - !col.getEncodingList.contains(Encoding.DIRECT_DICTIONARY)) { - GlobalDictionaryUtil - .loadDefaultDictionaryValueForNewColumn(carbonTablePath, - col, - tableIdentifier, - storePath, - rawData) - } } - + // generate dictionary files for the newly added columns + new AlterTableAddColumnRDD(sc, + newCols, + alterTableModel, + tableIdentifier, + storePath).collect() tableSchema.setListOfColumns(allColumns.asJava) tableInfo.setLastUpdatedTime(System.currentTimeMillis()) tableInfo.setFactTable(tableSchema) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/b5ba4c6e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala index efa2cd5..93a5912 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog} import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.cache.dictionary.ManageDictionary import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} @@ -38,6 +37,7 @@ import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException +import org.apache.carbondata.spark.rdd.AlterTableDropColumnRDD import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil} private[sql] case class AlterTableAddColumns( @@ -52,9 +52,10 @@ private[sql] case class AlterTableAddColumns( LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName") val carbonLock = AlterTableUtil .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession) + // get the latest carbon table and check for column existence + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() try { - // get the latest carbon table and check for column existence - val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) // read the latest schema file val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, carbonTable.getCarbonTableIdentifier) @@ -67,12 +68,12 @@ private[sql] case class AlterTableAddColumns( dbName, tableName, carbonTable.getStorePath) - val newCols = new AlterTableProcessor(alterTableAddColumnsModel, + newCols = new AlterTableProcessor(alterTableAddColumnsModel, dbName, wrapperTableInfo, carbonTablePath, carbonTable.getCarbonTableIdentifier, - carbonTable.getStorePath).process + carbonTable.getStorePath, sparkSession.sparkContext).process val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis) schemaEvolutionEntry.setAdded(newCols.toList.asJava) @@ -89,8 +90,16 @@ private[sql] case class AlterTableAddColumns( LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName") } catch { case e: Exception => - LOGGER.error("Alter table add columns failed : " + e.getMessage) - throw e + LOGGER.error(e, s"Alter table add columns failed : ${e.getMessage}") + // clean up the dictionary files in case of any failure + if (!newCols.isEmpty) { + LOGGER.info("Cleaning up the dictionary files as alter table add operation failed") + new AlterTableDropColumnRDD(sparkSession.sparkContext, + newCols, + carbonTable.getCarbonTableIdentifier, + carbonTable.getStorePath).collect() + } + sys.error("Alter table add column operation failed. Please check the logs") } finally { // release lock after command execution completion if (carbonLock != null) { @@ -186,8 +195,9 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName") LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName") } catch { - case e: Exception => LOGGER.error("Rename table failed: " + e.getMessage) - throw e + case e: Exception => + LOGGER.error(e, s"Rename table failed: ${e.getMessage}") + sys.error("Alter table rename table operation failed. Please check the logs") } finally { // release lock after command execution completion if (carbonLock != null) { @@ -237,9 +247,10 @@ private[sql] case class AlterTableDropColumns( val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) // check each column existence in the table val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala - var dictionaryColumns = ListBuffer[CarbonColumn]() + var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column + .ColumnSchema]() var keyColumnCountToBeDeleted = 0 - // TODO: if deleted column list includes shared dictionary/bucketted column throw an error + // TODO: if deleted column list includes bucketted column throw an error alterTableDropColumnModel.columns.foreach { column => var columnExist = false tableColumns.foreach { tableColumn => @@ -248,7 +259,7 @@ private[sql] case class AlterTableDropColumns( if (tableColumn.isDimesion) { keyColumnCountToBeDeleted += 1 if (tableColumn.hasEncoding(Encoding.DICTIONARY)) { - dictionaryColumns += tableColumn + dictionaryColumns ++= Seq(tableColumn.getColumnSchema) } } columnExist = true @@ -299,13 +310,16 @@ private[sql] case class AlterTableDropColumns( sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog]) // TODO: 1. add check for deletion of index tables // delete dictionary files for dictionary column and clear dictionary cache from memory - ManageDictionary.deleteDictionaryFileAndCache(dictionaryColumns.toList.asJava, carbonTable) + new AlterTableDropColumnRDD(sparkSession.sparkContext, + dictionaryColumns, + carbonTable.getCarbonTableIdentifier, + carbonTable.getStorePath).collect() LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName") } catch { case e: Exception => - LOGGER.error("Alter table drop columns failed : " + e.getMessage) - throw e + LOGGER.error(e, s"Alter table drop columns failed : ${e.getMessage}") + sys.error("Alter table drop column operation failed. Please check the logs") } finally { // release lock after command execution completion if (carbonLock != null) { @@ -387,8 +401,8 @@ private[sql] case class AlterTableDataTypeChange( LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName") } catch { case e: Exception => - LOGGER.error("Alter table change datatype failed : " + e.getMessage) - throw e + LOGGER.error(e, s"Alter table change datatype failed : ${e.getMessage}") + sys.error("Alter table data type change operation failed. Please check the logs") } finally { // release lock after command execution completion if (carbonLock != null) {