Github user gvramana commented on a diff in the pull request:

    
https://github.com/apache/incubator-carbondata/pull/641#discussion_r106095253
  
    --- Diff: 
integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 ---
    @@ -136,6 +140,298 @@ case class AlterTableCompaction(alterTableModel: 
AlterTableModel) extends Runnab
       }
     }
     
    +private[sql] case class AlterTableDataTypeChange(
    +    alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel) extends 
RunnableCommand {
    +
    +  val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  def run(sparkSession: SparkSession): Seq[Row] = {
    +    val tableName = alterTableDataTypeChangeModel.tableName
    +    val dbName = alterTableDataTypeChangeModel.databaseName
    +      .getOrElse(sparkSession.catalog.currentDatabase)
    +    LOGGER.audit(s"Alter table change data type request has been received 
for $dbName.$tableName")
    +    val relation =
    +      CarbonEnv.get.carbonMetastore
    +        .lookupRelation(Option(dbName), tableName)(sparkSession)
    +        .asInstanceOf[CarbonRelation]
    +    if (relation == null) {
    +      LOGGER.audit(s"Alter table change data type request has failed. " +
    +                   s"Table $dbName.$tableName does not exist")
    +      sys.error(s"Table $dbName.$tableName does not exist")
    +    }
    +    // acquire the lock first
    +    val table = relation.tableMeta.carbonTable
    +    val carbonLock = CarbonLockFactory
    +      
.getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
    +        LockUsage.METADATA_LOCK)
    +    try {
    +      // get the latest carbon table and check for column existence
    +      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + 
"_" + tableName)
    +      val columnName = alterTableDataTypeChangeModel.columnName
    +      var carbonColumnToBeModified: CarbonColumn = null
    +      val carbonColumns = 
carbonTable.getCreateOrderColumn(tableName).asScala
    +      // read the latest schema file
    +      val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
    +        carbonTable.getCarbonTableIdentifier)
    +      val tableMetadataFile = carbonTablePath.getSchemaFilePath
    +      val tableInfo: org.apache.carbondata.format.TableInfo = 
CarbonEnv.get.carbonMetastore
    +        .readSchemaFile(tableMetadataFile)
    +      // maintain the added column for schema evolution history
    +      var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
    +      var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = 
null
    +      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
    +      columnSchemaList.foreach { columnSchema =>
    +        if (columnSchema.column_name.equalsIgnoreCase(columnName)) {
    +          deletedColumnSchema = 
CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
    +          columnSchema.setData_type(DataTypeConverterUtil
    +            
.convertToThriftDataType(alterTableDataTypeChangeModel.dataTypeInfo.dataType))
    +          
columnSchema.setPrecision(alterTableDataTypeChangeModel.dataTypeInfo.precision)
    +          
columnSchema.setScale(alterTableDataTypeChangeModel.dataTypeInfo.scale)
    +          addColumnSchema = columnSchema
    +        }
    +      }
    +      val schemaEvolutionEntry = new 
SchemaEvolutionEntry(System.currentTimeMillis)
    +      schemaEvolutionEntry.setAdded(List(addColumnSchema).asJava)
    +      schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava)
    +      
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
    +        .setTime_stamp(System.currentTimeMillis)
    +      CarbonEnv.get.carbonMetastore
    +        .updateTableSchema(carbonTable.getCarbonTableIdentifier,
    +          tableInfo,
    +          schemaEvolutionEntry,
    +          carbonTable.getStorePath)(sparkSession)
    +
    +      val tableIdentifier = TableIdentifier(tableName, Some(dbName))
    +      val schema = CarbonEnv.get.carbonMetastore
    +        .lookupRelation(tableIdentifier)(sparkSession).schema.json
    +      
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
    +        s"ALTER TABLE $dbName.$tableName SET 
TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
    +      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
    +      LOGGER.info(s"Alter table for data type change is successful for 
table $dbName.$tableName")
    +      LOGGER.audit(s"Alter table for data type change is successful for 
table $dbName.$tableName")
    +    } catch {
    +      case e: Exception =>
    +        LOGGER.error("Alter table change datatype failed : " + 
e.getMessage)
    +        throw e
    +    } finally {
    +      // release lock after command execution completion
    +      if (carbonLock != null) {
    +        if (carbonLock.unlock()) {
    +          LOGGER.info("Alter table change data type lock released 
successfully")
    +        } else {
    +          LOGGER.error("Unable to release lock during alter table change 
data type operation")
    +        }
    +      }
    +    }
    +    Seq.empty
    +  }
    +}
    +
    +private[sql] case class AlterTableAddColumns(
    +    alterTableAddColumnsModel: AlterTableAddColumnsModel) extends 
RunnableCommand {
    +
    +  val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  def run(sparkSession: SparkSession): Seq[Row] = {
    +    val tableName = alterTableAddColumnsModel.tableName
    +    val dbName = alterTableAddColumnsModel.databaseName
    +      .getOrElse(sparkSession.catalog.currentDatabase)
    +    LOGGER.audit(s"Alter table add columns request has been received for 
$dbName.$tableName")
    +    val relation =
    +      CarbonEnv.get.carbonMetastore
    +        .lookupRelation(Option(dbName), tableName)(sparkSession)
    +        .asInstanceOf[CarbonRelation]
    +    if (relation == null) {
    +      LOGGER.audit(s"Alter table add columns request has failed. " +
    +                   s"Table $dbName.$tableName does not exist")
    +      sys.error(s"Table $dbName.$tableName does not exist")
    +    }
    +    // acquire the lock first
    +    val table = relation.tableMeta.carbonTable
    +    val carbonLock = CarbonLockFactory
    +      
.getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
    +        LockUsage.METADATA_LOCK)
    +    try {
    +      // get the latest carbon table and check for column existence
    +      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + 
"_" + tableName)
    +      // read the latest schema file
    +      val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
    +        carbonTable.getCarbonTableIdentifier)
    +      val tableMetadataFile = carbonTablePath.getSchemaFilePath
    +      val thriftTableInfo: org.apache.carbondata.format.TableInfo = 
CarbonEnv.get.carbonMetastore
    +        .readSchemaFile(tableMetadataFile)
    +      val schemaConverter = new ThriftWrapperSchemaConverterImpl()
    +      val wrapperTableInfo = schemaConverter
    +        .fromExternalToWrapperTableInfo(thriftTableInfo,
    +          dbName,
    +          tableName,
    +          carbonTable.getStorePath)
    +      val newCols = new AlterTableProcessor(alterTableAddColumnsModel,
    +        dbName,
    +        wrapperTableInfo,
    +        carbonTablePath,
    +        carbonTable.getCarbonTableIdentifier,
    +        carbonTable.getStorePath).process
    +      val schemaEvolutionEntry = new org.apache.carbondata.core.metadata
    +      .schema.SchemaEvolutionEntry()
    +      schemaEvolutionEntry.setTimeStamp(System.currentTimeMillis)
    +      schemaEvolutionEntry.setAdded(newCols.toList.asJava)
    +
    +      val thriftTable = schemaConverter
    +        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, 
tableName)
    +      
thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
    +        .setTime_stamp(System.currentTimeMillis)
    +      CarbonEnv.get.carbonMetastore
    +        .updateTableSchema(carbonTable.getCarbonTableIdentifier,
    +          thriftTable,
    +          
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
    +          carbonTable.getStorePath)(sparkSession)
    +
    +      val tableIdentifier = TableIdentifier(tableName, Some(dbName))
    +      val schema = CarbonEnv.get.carbonMetastore
    +        .lookupRelation(tableIdentifier)(sparkSession).schema.json
    +      
sparkSession.sharedState.externalCatalog.asInstanceOf[HiveExternalCatalog].client.runSqlHive(
    +        s"ALTER TABLE $dbName.$tableName SET 
TBLPROPERTIES('spark.sql.sources.schema'='$schema')")
    +      sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
    +      LOGGER.info(s"Alter table for add columns is successful for table 
$dbName.$tableName")
    +      LOGGER.audit(s"Alter table for add columns is successful for table 
$dbName.$tableName")
    +    } catch {
    +      case e: Exception =>
    +        LOGGER.error("Alter table add columns failed : " + e.getMessage)
    +        throw e
    +    } finally {
    +      // release lock after command execution completion
    +      if (carbonLock != null) {
    +        if (carbonLock.unlock()) {
    +          LOGGER.info("Alter table add columns lock released successfully")
    +        } else {
    +          LOGGER.error("Unable to release lock during alter table add 
columns operation")
    +        }
    +      }
    +    }
    +    Seq.empty
    +  }
    +}
    +
    +private[sql] case class AlterTableDropColumns(
    +    alterTableDropColumnModel: AlterTableDropColumnModel) extends 
RunnableCommand {
    +
    +  val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
    +
    +  def run(sparkSession: SparkSession): Seq[Row] = {
    +    val tableName = alterTableDropColumnModel.tableName
    +    val dbName = alterTableDropColumnModel.databaseName
    +      .getOrElse(sparkSession.catalog.currentDatabase)
    +    LOGGER.audit(s"Alter table drop columns request has been received for 
$dbName.$tableName")
    +    val relation =
    +      CarbonEnv.get.carbonMetastore
    +        .lookupRelation(Option(dbName), tableName)(sparkSession)
    +        .asInstanceOf[CarbonRelation]
    +    if (relation == null) {
    +      LOGGER.audit(s"Alter table drop columns request has failed. " +
    +                   s"Table $dbName.$tableName does not exist")
    +      sys.error(s"Table $dbName.$tableName does not exist")
    +    }
    +    // acquire the lock first
    +    val table = relation.tableMeta.carbonTable
    +    val carbonLock = CarbonLockFactory
    +      
.getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
    +        LockUsage.METADATA_LOCK)
    +    try {
    +      // get the latest carbon table and check for column existence
    +      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + 
"_" + tableName)
    +      // check each column existence in the table
    +      val tableColumns = 
carbonTable.getCreateOrderColumn(tableName).asScala
    +      var dictionaryColumns = ListBuffer[CarbonColumn]()
    +      var keyColumnCountToBeDeleted = 0
    +      // TODO: if deleted column list includes shared dictionary/bucketted 
column throw an error
    +      alterTableDropColumnModel.columns.foreach { column =>
    +        var columnExist = false
    +        tableColumns.foreach { tableColumn =>
    +          // column should not be already deleted and should exist in the 
table
    +          if (!tableColumn.isInvisible && 
column.equalsIgnoreCase(tableColumn.getColName)) {
    +            if (tableColumn.isDimesion) {
    +              keyColumnCountToBeDeleted += 1
    +              if (tableColumn.hasEncoding(Encoding.DICTIONARY)) {
    +                dictionaryColumns += tableColumn
    +              }
    +            }
    +            columnExist = true
    +          }
    +        }
    +        if (!columnExist) {
    +          sys.error(s"Column $column does not exists in the table 
$dbName.$tableName")
    +        }
    +      }
    +      // take the total key column count. key column to be deleted should 
not
    +      // be >= key columns in schema
    +      var totalKeyColumnInSchema = 0
    +      tableColumns.foreach { tableColumn =>
    +        // column should not be already deleted and should exist in the 
table
    +        if (!tableColumn.isInvisible && tableColumn.isDimesion) {
    +          totalKeyColumnInSchema += 1
    +        }
    +      }
    +      if (keyColumnCountToBeDeleted >= totalKeyColumnInSchema) {
    +        sys.error(s"Alter drop operation failed. AtLeast one key column 
should exist after drop.")
    +      }
    +      // read the latest schema file
    +      val carbonTablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
    +        carbonTable.getCarbonTableIdentifier)
    +      val tableMetadataFile = carbonTablePath.getSchemaFilePath
    +      val tableInfo: org.apache.carbondata.format.TableInfo = 
CarbonEnv.get.carbonMetastore
    +        .readSchemaFile(tableMetadataFile)
    +      // maintain the deleted columns for schema evolution history
    +      var deletedColumnSchema = 
ListBuffer[org.apache.carbondata.format.ColumnSchema]()
    +      val columnSchemaList = tableInfo.fact_table.table_columns.asScala
    +      alterTableDropColumnModel.columns.foreach { column =>
    +        columnSchemaList.foreach { columnSchema =>
    +          if (!columnSchema.invisible && 
column.equalsIgnoreCase(columnSchema.column_name)) {
    +            deletedColumnSchema += 
CarbonScalaUtil.createColumnSchemaCopyObject(columnSchema)
    +            columnSchema.invisible = true
    +          }
    +        }
    +      }
    +      // add deleted columns to schema evolution history and update the 
schema
    +      
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
    +        .setTime_stamp(System.currentTimeMillis)
    +      val schemaEvolutionEntry = new 
SchemaEvolutionEntry(System.currentTimeMillis)
    +      schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava)
    --- End diff --
    
    Add function addEvolutionEntry


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to