Repository: carbondata Updated Branches: refs/heads/master fd28b1561 -> 467311375
[CARBONDATA-1618]Table comment support for alter table This closes #1472 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/46731137 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/46731137 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/46731137 Branch: refs/heads/master Commit: 46731137579750d8389f3f9c4ec58547457fda2d Parents: fd28b15 Author: Pawan Malwal <pmalwal1...@gmail.com> Authored: Mon Nov 6 18:10:14 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Wed Nov 8 17:56:54 2017 +0530 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 5 + .../core/metadata/schema/table/TableInfo.java | 14 -- .../TestAlterTableWithTableComment.scala | 130 +++++++++++++++++++ .../command/carbonTableSchemaCommon.scala | 1 - .../CarbonDescribeFormattedCommand.scala | 4 +- .../command/schema/AlterTableSetCommand.scala | 44 +++++++ .../command/schema/AlterTableUnsetCommand.scala | 45 +++++++ .../sql/execution/strategy/DDLStrategy.scala | 12 +- .../org/apache/spark/util/AlterTableUtil.scala | 82 ++++++++++++ 9 files changed, 320 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/46731137/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 3fed18f..711b237 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1374,6 +1374,11 @@ public final class CarbonCommonConstants { @CarbonProperty public static final String BITSET_PIPE_LINE = "carbon.use.bitset.pipe.line"; + /** + * this will be used to provide comment for table + */ + public static final String TABLE_COMMENT = "comment"; + public static final String BITSET_PIPE_LINE_DEFAULT = "true"; /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/46731137/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java index 717eada..d1a7e5b 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java @@ -79,12 +79,6 @@ public class TableInfo implements Serializable, Writable { // this idenifier is a lazy field which will be created when it is used first time private AbsoluteTableIdentifier identifier; - // table comment - private String tableComment; - - public TableInfo() { - } - /** * @return the factTable */ @@ -163,14 +157,6 @@ public class TableInfo implements Serializable, Writable { this.storePath = storePath; } - public String getTableComment() { - return tableComment; - } - - public void setTableComment(String tableComment) { - this.tableComment = tableComment; - } - /** * to generate the hash code */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/46731137/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestAlterTableWithTableComment.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestAlterTableWithTableComment.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestAlterTableWithTableComment.scala new file mode 100644 index 0000000..b01fe4f --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestAlterTableWithTableComment.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +/** + * test functionality for alter table with table comment + */ +class TestAlterTableWithTableComment extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + sql("use default") + sql("drop table if exists alterTableWithTableComment") + sql("drop table if exists alterTableWithoutTableComment") + sql("drop table if exists alterTableUnsetTableComment") + } + + test("test add table comment using alter table set ") { + sql( + s""" + | create table alterTableWithTableComment( + | id int, + | name string + | ) + | STORED BY 'carbondata' + """.stripMargin + ) + + val create_result = sql("describe formatted alterTableWithTableComment") + + checkExistence(create_result, true, "Comment:") + checkExistence(create_result, false, "This is table comment") + + sql( + s""" + | alter table alterTableWithTableComment + | SET TBLPROPERTIES ( + | 'comment'='This table comment is added by alter table' + | ) + """.stripMargin + ) + + val alter_result = sql("describe formatted alterTableWithTableComment") + + checkExistence(alter_result, true, "Comment:") + checkExistence(alter_result, true, "This table comment is added by alter table") + } + + test("test modifiy table comment using alter table set ") { + sql( + s""" + | create table alterTableWithoutTableComment( + | id int, + | name string + | comment "This is table comment" + | ) + | STORED BY 'carbondata' + """.stripMargin + ) + + sql( + s""" + | alter table alterTableWithoutTableComment + | SET TBLPROPERTIES ( + | 'comment'='This table comment is modified by alter table set' + | ) + """.stripMargin + ) + + val alter_result = sql("describe formatted alterTableWithoutTableComment") + + checkExistence(alter_result, true, "Comment:") + checkExistence(alter_result, true, "This table comment is modified by alter table set") + } + + test("test remove table comment using alter table unset ") { + sql( + s""" + | create table alterTableUnsetTableComment( + | id int, + | name string + | ) + | comment "This is table comment" + | STORED BY 'carbondata' + """.stripMargin + ) + + val create_result = sql("describe formatted alterTableUnsetTableComment") + + checkExistence(create_result, true, "Comment:") + checkExistence(create_result, true, "This is table comment") + + sql( + s""" + | alter table alterTableUnsetTableComment + | UNSET TBLPROPERTIES ('comment') + """.stripMargin + ) + + val alter_result = sql("describe formatted alterTableUnsetTableComment") + + checkExistence(alter_result, true, "Comment:") + checkExistence(alter_result, false, "This is table comment") + } + + override def afterAll: Unit = { + sql("use default") + sql("drop table if exists alterTableWithTableComment") + sql("drop table if exists alterTableWithoutTableComment") + sql("drop table if exists alterTableUnsetTableComment") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/46731137/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index fba3085..1188b59 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -551,7 +551,6 @@ class TableNewProcessor(cm: TableModel) { tableInfo.setTableUniqueName(cm.databaseName + "_" + cm.tableName) tableInfo.setLastUpdatedTime(System.currentTimeMillis()) tableInfo.setFactTable(tableSchema) - tableInfo.setTableComment(cm.tableComment.getOrElse("")) tableInfo } http://git-wip-us.apache.org/repos/asf/carbondata/blob/46731137/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala index e57f490..519fbea 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDescribeFormattedCommand.scala @@ -106,7 +106,9 @@ private[sql] case class CarbonDescribeFormattedCommand( results ++= Seq(("CARBON Store Path: ", relation.tableMeta.storePath, "")) val carbonTable = relation.tableMeta.carbonTable // Carbon table support table comment - results ++= Seq(("Comment: ", carbonTable.getTableInfo.getTableComment, "")) + val tableComment = carbonTable.getTableInfo.getFactTable.getTableProperties + .getOrDefault(CarbonCommonConstants.TABLE_COMMENT, "") + results ++= Seq(("Comment: ", tableComment, "")) results ++= Seq(("Table Block Size : ", carbonTable.getBlockSizeInMB + " MB", "")) results ++= Seq(("SORT_SCOPE", carbonTable.getTableInfo.getFactTable .getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants http://git-wip-us.apache.org/repos/asf/carbondata/blob/46731137/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala new file mode 100644 index 0000000..afbf8f6 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableSetCommand.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.format.TableInfo + +private[sql] case class AlterTableSetCommand(val tableIdentifier: TableIdentifier, + val properties: Map[String, String], + val isView: Boolean) + extends RunnableCommand with SchemaProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + AlterTableUtil.modifyTableComment(tableIdentifier, properties, Nil, + true)(sparkSession, sparkSession.sessionState.asInstanceOf[CarbonSessionState]) + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/46731137/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala new file mode 100644 index 0000000..0bcae1e --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/AlterTableUnsetCommand.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.schema + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionState} +import org.apache.spark.util.AlterTableUtil + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.format.TableInfo + +private[sql] case class AlterTableUnsetCommand(val tableIdentifier: TableIdentifier, + val propKeys: Seq[String], + val ifExists: Boolean, + val isView: Boolean) + extends RunnableCommand with SchemaProcessCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + AlterTableUtil.modifyTableComment(tableIdentifier, Map.empty[String, String], + propKeys, false)(sparkSession, sparkSession.sessionState.asInstanceOf[CarbonSessionState]) + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/46731137/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index bf13e41..e39ba73 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.{AlterTableCompactionCommand, CarbonShowLoadsCommand, LoadTableByInsertCommand, LoadTableCommand} import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsCommand -import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand, CarbonAlterTableRenameCommand} +import org.apache.spark.sql.execution.command.schema._ import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand} import org.apache.carbondata.core.util.CarbonUtil @@ -156,6 +156,16 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { val cmd = CreateDataSourceTableCommand(updatedCatalog, ignoreIfExists = mode == SaveMode.Ignore) ExecutedCommandExec(cmd) :: Nil + case AlterTableSetPropertiesCommand(tableName, properties, isView) + if (CarbonEnv.getInstance(sparkSession).carbonMetastore + .tableExists(tableName)(sparkSession)) => { + ExecutedCommandExec(AlterTableSetCommand(tableName, properties, isView)) :: Nil + } + case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView) + if (CarbonEnv.getInstance(sparkSession).carbonMetastore + .tableExists(tableName)(sparkSession)) => { + ExecutedCommandExec(AlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil + } case _ => Nil } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/46731137/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 74f4dd0..44f5a36 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.util import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ListBuffer import org.apache.spark.SparkConf @@ -30,10 +31,13 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock} +import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.CarbonTableIdentifier +import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo} +import org.apache.carbondata.format.TableInfo object AlterTableUtil { @@ -317,4 +321,82 @@ object AlterTableUtil { } } + /** + * This method add/modify the table comments. + * + * @param tableIdentifier + * @param properties + * @param propKeys + * @param set + * @param sparkSession + * @param sessionState + */ + def modifyTableComment(tableIdentifier: TableIdentifier, properties: Map[String, String], + propKeys: Seq[String], set: Boolean) + (sparkSession: SparkSession, sessionState: CarbonSessionState): Unit = { + val tableName = tableIdentifier.table + val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) + LOGGER.audit(s"Alter table comment request has been received for $dbName.$tableName") + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK) + var locks = List.empty[ICarbonLock] + var timeStamp = 0L + var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]() + var carbonTable: CarbonTable = null + try { + locks = AlterTableUtil + .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession) + val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore + carbonTable = metastore + .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation] + .tableMeta.carbonTable + // get the latest carbon table + // read the latest schema file + val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath, + carbonTable.getCarbonTableIdentifier) + val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession) + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter + .fromExternalToWrapperTableInfo(thriftTableInfo, + dbName, + tableName, + carbonTable.getStorePath) + val schemaEvolutionEntry = new org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry + schemaEvolutionEntry.setTimeStamp(timeStamp) + val thriftTable = schemaConverter + .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + val tblPropertiesMap: mutable.Map[String, String] = + thriftTable.fact_table.getTableProperties.asScala + if (set) { + // This overrides old properties and update the comment parameter of thriftTable + // with the newly added/modified comment since thriftTable also holds comment as its + // direct property. + + properties.foreach { x => + if (x._1.equalsIgnoreCase(CarbonCommonConstants.TABLE_COMMENT)) { + tblPropertiesMap.put(x._1, x._2) + } + } + } else { + // This removes the comment parameter from thriftTable + // since thriftTable also holds comment as its property. + propKeys.foreach { x => + if (x.equalsIgnoreCase(CarbonCommonConstants.TABLE_COMMENT)) { + tblPropertiesMap.remove(x) + } + } + } + updateSchemaInfo(carbonTable, + schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), + thriftTable)(sparkSession, sessionState) + LOGGER.info(s"Alter table comment is successful for table $dbName.$tableName") + LOGGER.audit(s"Alter table comment is successful for table $dbName.$tableName") + } catch { + case e: Exception => + LOGGER.error(e, "Alter table comment failed") + sys.error(s"Alter table comment operation failed: ${e.getMessage}") + } finally { + // release lock after command execution completion + AlterTableUtil.releaseLocks(locks) + } + } }