[CARBONDATA-2097] Restriction added to partition table on alter command (add,rename on partition table and drop partition on preaggregate table)
Restriction added to partition table on alter command (add and rename on partion table and drop partition on preaggregate table) This closes #1885 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/033870da Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/033870da Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/033870da Branch: refs/heads/fgdatamap Commit: 033870dab137ba99f5273c934741e96884b3247d Parents: 099a047 Author: kushalsaha <kushalsaha1...@gmail.com> Authored: Tue Jan 30 16:38:47 2018 +0530 Committer: manishgupta88 <tomanishgupt...@gmail.com> Committed: Wed Jan 31 14:46:54 2018 +0530 ---------------------------------------------------------------------- .../StandardPartitionTableQueryTestCase.scala | 42 ++++++++++++++++++++ ...rbonAlterTableDropHivePartitionCommand.scala | 7 +++- .../sql/execution/strategy/DDLStrategy.scala | 21 ++++++++++ 3 files changed, 69 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/033870da/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala index d1ef94c..b1fc0a7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala @@ -248,6 +248,46 @@ test("Creation of partition table should fail if the colname in table schema and } } + test("Renaming a partition table should fail"){ + sql("drop table if exists partitionTable") + sql( + """create table partitionTable (id int,name String) partitioned by(email string) stored by 'carbondata' + """.stripMargin) + sql("insert into partitionTable select 1,'huawei','abc'") + checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc"))) + intercept[Exception]{ + sql("alter table partitionTable PARTITION (email='abc') rename to PARTITION (email='def)") + } + } + + test("add partition based on location on partition table should fail"){ + sql("drop table if exists partitionTable") + sql( + """create table partitionTable (id int,name String) partitioned by(email string) stored by 'carbondata' + """.stripMargin) + sql("insert into partitionTable select 1,'huawei','abc'") + checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc"))) + intercept[Exception]{ + sql("alter table partitionTable add partition (email='def') location 'abc/part1'") + } + } + + test("drop partition on preAggregate table should fail"){ + sql("drop table if exists partitionTable") + sql("drop datamap if exists preaggTable on table partitionTable") + sql("create table partitionTable (id int,city string,age int) partitioned by(name string) stored by 'carbondata'".stripMargin) + sql( + s"""create datamap preaggTable on table partitionTable using 'preaggregate' as select id,sum(age) from partitionTable group by id""" + .stripMargin) + sql("insert into partitionTable select 1,'Bangalore',30,'John'") + sql("insert into partitionTable select 2,'Chennai',20,'Huawei'") + checkAnswer(sql("show partitions partitionTable"), Seq(Row("name=John"),Row("name=Huawei"))) + intercept[Exception]{ + sql("alter table partitionTable drop PARTITION(name='John')") + } + } + + private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = { val plan = frame.queryExecution.sparkPlan val scanRDD = plan collect { @@ -277,6 +317,8 @@ test("Creation of partition table should fail if the colname in table schema and sql("drop table if exists badrecordsignore") sql("drop table if exists badrecordsPartitionintnull") sql("drop table if exists badrecordsPartitionintnullalt") + sql("drop table if exists partitionTable") + sql("drop datamap if exists preaggTable on table partitionTable") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/033870da/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala index c3509a3..0158a32 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -21,7 +21,7 @@ import java.util import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand} @@ -34,6 +34,7 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.PartitionMapFileStore import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager +import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} /** @@ -61,6 +62,10 @@ case class CarbonAlterTableDropHivePartitionCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val table = CarbonEnv.getCarbonTable(tableName)(sparkSession) + if (CarbonUtil.hasAggregationDataMap(table)) { + throw new AnalysisException( + "Partition can not be dropped as it is mapped to Pre Aggregate table") + } if (table.isHivePartitionTable) { try { specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f))) http://git-wip-us.apache.org/repos/asf/carbondata/blob/033870da/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index db8c6a2..b174b94 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -254,6 +254,27 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { ExecutedCommandExec( CarbonAlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil } + case rename@AlterTableRenamePartitionCommand(tableName, oldPartition, newPartition) => + val dbOption = tableName.database.map(_.toLowerCase) + val tableIdentifier = TableIdentifier(tableName.table.toLowerCase(), dbOption) + val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + .tableExists(tableIdentifier)(sparkSession) + if (isCarbonTable) { + throw new UnsupportedOperationException("Renaming partition on table is not supported") + } else { + ExecutedCommandExec(rename) :: Nil + } + case addPartition@AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, _) => + val dbOption = tableName.database.map(_.toLowerCase) + val tableIdentifier = TableIdentifier(tableName.table.toLowerCase(), dbOption) + val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + .tableExists(tableIdentifier)(sparkSession) + if (isCarbonTable && partitionSpecsAndLocs.exists(_._2.isDefined)) { + throw new UnsupportedOperationException( + "add partition with location is not supported") + } else { + ExecutedCommandExec(addPartition) :: Nil + } case RefreshTable(tableIdentifier) => RefreshCarbonTableCommand(tableIdentifier.database, tableIdentifier.table).run(sparkSession)