Github user gvramana commented on a diff in the pull request: https://github.com/apache/incubator-carbondata/pull/641#discussion_r106095253 --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala --- @@ -136,6 +140,298 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab } } +private[sql] case class AlterTableDataTypeChange( + alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends RunnableCommand { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def run(sparkSession: SparkSession): Seq[Row] = { + val tableName = alterTableDataTypeChangeModel.tableName + val dbName = alterTableDataTypeChangeModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName") + val relation = + CarbonEnv.get.carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + if (relation == null) { + LOGGER.audit(s"Alter table change data type request has failed. " + + s"Table $dbName.$tableName does not exist") + sys.error(s"Table $dbName.$tableName does not exist") + } + // acquire the lock first + val table = relation.tableMeta.carbonTable + val carbonLock = CarbonLockFactory + .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.METADATA_LOCK) + try { + // get the latest carbon table and check for column existence + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + val columnName = alterTableDataTypeChangeModel.columnName + var carbonColumnToBeModified: CarbonColumn = null + val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore + .readSchemaFile(tableMetadataFile) + // maintain the added column for schema evolution history + var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null + var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null + val columnSchemaList = tableInfo.fact_table.table_columns.asScala + columnSchemaList.foreach { columnSchema => + if (columnSchema.column_name.equalsIgnoreCase(columnName)) { + deletedColumnSchema = CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema) + columnSchema.setData_type(DataTypeConverterUtil + .convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType)) + columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision) + columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale) + addColumnSchema = columnSchema + } + } + val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) + schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava) + schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava) + tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) + .setTime_stamp(System.currentTimeMillis) + CarbonEnv.get.carbonMetastore + .updateTableSchema(carbonTable.getCarbonTableIdentifier, + tableInfo, + schemaEvolutionEntry, + carbonTable.getStorePath)(sparkSession) + + val tableIdentifier = TableIdentifier(tableName, Some(dbName)) + val schema = CarbonEnv.get.carbonMetastore + .lookupRelation(tableIdentifier)(sparkSession).schema.json + sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive( + s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')") + sparkSession.catalog.refreshTable(tableIdentifier.quotedString) + LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName") + LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName") + } catch { + case e: Exception => + LOGGER.error("Alter table change datatype failed : " + e.getMessage) + throw e + } finally { + // release lock after command execution completion + if (carbonLock != null) { + if (carbonLock.unlock()) { + LOGGER.info("Alter table change data type lock released successfully") + } else { + LOGGER.error("Unable to release lock during alter table change data type operation") + } + } + } + Seq.empty + } +} + +private[sql] case class AlterTableAddColumns( + alterTableAddColumnsModel: AlterTableAddColumnsModel) extends RunnableCommand { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def run(sparkSession: SparkSession): Seq[Row] = { + val tableName = alterTableAddColumnsModel.tableName + val dbName = alterTableAddColumnsModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName") + val relation = + CarbonEnv.get.carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + if (relation == null) { + LOGGER.audit(s"Alter table add columns request has failed. " + + s"Table $dbName.$tableName does not exist") + sys.error(s"Table $dbName.$tableName does not exist") + } + // acquire the lock first + val table = relation.tableMeta.carbonTable + val carbonLock = CarbonLockFactory + .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.METADATA_LOCK) + try { + // get the latest carbon table and check for column existence + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val thriftTableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore + .readSchemaFile(tableMetadataFile) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + dbName, + tableName, + carbonTable.getStorePath) + val newCols = new AlterTableProcessor(alterTableAddColumnsModel, + dbName, + wrapperTableInfo, + carbonTablePath, + carbonTable.getCarbonTableIdentifier, + carbonTable.getStorePath).process + val schemaEvolutionEntry = new org.apache.carbondata.core.metadata + .schema.SchemaEvolutionEntry() + schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis) + schemaEvolutionEntry.setAdded(newCols.toList.asJava) + + val thriftTable = schemaConverter + .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) + .setTime_stamp(System.currentTimeMillis) + CarbonEnv.get.carbonMetastore + .updateTableSchema(carbonTable.getCarbonTableIdentifier, + thriftTable, + schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), + carbonTable.getStorePath)(sparkSession) + + val tableIdentifier = TableIdentifier(tableName, Some(dbName)) + val schema = CarbonEnv.get.carbonMetastore + .lookupRelation(tableIdentifier)(sparkSession).schema.json + sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive( + s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES('spark.sql.sources.schema'='$schema')") + sparkSession.catalog.refreshTable(tableIdentifier.quotedString) + LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName") + LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName") + } catch { + case e: Exception => + LOGGER.error("Alter table add columns failed : " + e.getMessage) + throw e + } finally { + // release lock after command execution completion + if (carbonLock != null) { + if (carbonLock.unlock()) { + LOGGER.info("Alter table add columns lock released successfully") + } else { + LOGGER.error("Unable to release lock during alter table add columns operation") + } + } + } + Seq.empty + } +} + +private[sql] case class AlterTableDropColumns( + alterTableDropColumnModel: AlterTableDropColumnModel) extends RunnableCommand { + + val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + def run(sparkSession: SparkSession): Seq[Row] = { + val tableName = alterTableDropColumnModel.tableName + val dbName = alterTableDropColumnModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName") + val relation = + CarbonEnv.get.carbonMetastore + .lookupRelation(Option(dbName), tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + if (relation == null) { + LOGGER.audit(s"Alter table drop columns request has failed. " + + s"Table $dbName.$tableName does not exist") + sys.error(s"Table $dbName.$tableName does not exist") + } + // acquire the lock first + val table = relation.tableMeta.carbonTable + val carbonLock = CarbonLockFactory + .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, + LockUsage.METADATA_LOCK) + try { + // get the latest carbon table and check for column existence + val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName) + // check each column existence in the table + val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala + var dictionaryColumns = ListBuffer[CarbonColumn]() + var keyColumnCountToBeDeleted = 0 + // TODO: if deleted column list includes shared dictionary/bucketted column throw an error + alterTableDropColumnModel.columns.foreach { column => + var columnExist = false + tableColumns.foreach { tableColumn => + // column should not be already deleted and should exist in the table + if (!tableColumn.isInvisible && column.equalsIgnoreCase(tableColumn.getColName)) { + if (tableColumn.isDimesion) { + keyColumnCountToBeDeleted += 1 + if (tableColumn.hasEncoding(Encoding.DICTIONARY)) { + dictionaryColumns += tableColumn + } + } + columnExist = true + } + } + if (!columnExist) { + sys.error(s"Column $column does not exists in the table $dbName.$tableName") + } + } + // take the total key column count. key column to be deleted should not + // be >= key columns in schema + var totalKeyColumnInSchema = 0 + tableColumns.foreach { tableColumn => + // column should not be already deleted and should exist in the table + if (!tableColumn.isInvisible && tableColumn.isDimesion) { + totalKeyColumnInSchema += 1 + } + } + if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) { + sys.error(s"Alter drop operation failed. AtLeast one key column should exist after drop.") + } + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val tableMetadataFile = carbonTablePath.getSchemaFilePath + val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.get.carbonMetastore + .readSchemaFile(tableMetadataFile) + // maintain the deleted columns for schema evolution history + var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]() + val columnSchemaList = tableInfo.fact_table.table_columns.asScala + alterTableDropColumnModel.columns.foreach { column => + columnSchemaList.foreach { columnSchema => + if (!columnSchema.invisible && column.equalsIgnoreCase(columnSchema.column_name)) { + deletedColumnSchema += CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema) + columnSchema.invisible = true + } + } + } + // add deleted columns to schema evolution history and update the schema + tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) + .setTime_stamp(System.currentTimeMillis) + val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis) + schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava) --- End diff -- Add function addEvolutionEntry
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---