Indhumathi27 commented on code in PR #4276:
URL: https://github.com/apache/carbondata/pull/4276#discussion_r907039904
##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -105,6 +109,28 @@ case class CarbonAlterTableDropHivePartitionCommand(
ifExists,
purge,
retainData).run(sparkSession)
+
+ // move the partition files to trash folder which are dropped
+ val droppedPartitionNames = partitions.map { partition =>
+ partition.spec.map { specs => specs._1 + "=" + specs._2}
+ }
+ val droppedPartitionName = droppedPartitionNames
+ .head
+ .headOption
+ .get
+ TrashUtil.copyFilesToTrash(carbonPartitionsTobeDropped,
+ TrashUtil.getCompleteTrashFolderPath(
Review Comment:
`getCompleteTrashFolderPath` is used to create Path for Non-partition
table(appends Segment Prefix), I think, for partition table, we should create
Path with Partition folder name
##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -105,6 +109,28 @@ case class CarbonAlterTableDropHivePartitionCommand(
ifExists,
purge,
retainData).run(sparkSession)
+
+ // move the partition files to trash folder which are dropped
+ val droppedPartitionNames = partitions.map { partition =>
+ partition.spec.map { specs => specs._1 + "=" + specs._2}
+ }
+ val droppedPartitionName = droppedPartitionNames
+ .head
+ .headOption
+ .get
+ TrashUtil.copyFilesToTrash(carbonPartitionsTobeDropped,
+ TrashUtil.getCompleteTrashFolderPath(
+ table.getTablePath,
+ CarbonUpdateUtil.readCurrentTime(),
+ droppedPartitionName))
+ // Delete partition folder after copy to trash
+ carbonPartitionsTobeDropped.asScala.foreach(delPartition => {
Review Comment:
Please format the new code changes
##########
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala:
##########
@@ -218,6 +222,129 @@ class StandardPartitionTableDropTestCase extends
QueryTest with BeforeAndAfterAl
Seq(Row(0)))
}
+ test("dropping partition with moving data to trash") {
+ sql("drop table if exists dropPartition1")
+ sql(
+ """
+ | CREATE TABLE dropPartition1 (empno int, empname String, designation
String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int,
+ | projectjoindate Timestamp, projectenddate Date,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+ | STORED AS carbondata
+ """.stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
dropPartition1 OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
dropPartition1 OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
dropPartition1 OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ sql(
+ s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
dropPartition1 OPTIONS
+ |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+ sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='Learning')""")
+ sql(s"""ALTER TABLE dropPartition1 DROP
PARTITION(deptname='configManagement')""")
+ sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='network')""")
+ sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='protocol')""")
+ sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='security')""")
+ val table = CarbonEnv.getCarbonTable(Option("default"),
"dropPartition1")(sqlContext
+ .sparkSession)
+ val tablePath = table.getTablePath
+ val deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+ file => file.getName.equalsIgnoreCase("deptname=Learning")
+ }
+ assert(deptname.length == 0)
+ val configManagement =
FileFactory.getCarbonFile(tablePath).listFiles().filter{
+ file => file.getName.equalsIgnoreCase("deptname=configManagement")
+ }
+ assert(configManagement.length == 0)
+ val network = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+ file => file.getName.equalsIgnoreCase("deptname=network")
+ }
+ assert(network.length == 0)
+ val protocol = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+ file => file.getName.equalsIgnoreCase("deptname=protocol")
+ }
+ assert(protocol.length == 0)
+ val security = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+ file => file.getName.equalsIgnoreCase("deptname=protocol")
+ }
+ assert(security.length == 0)
+ sql("drop table if exists dropPartition1")
Review Comment:
please add a validation to check the data count after drop partition
##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -30,13 +30,17 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
Review Comment:
Please remove unused imports
##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala:
##########
@@ -105,6 +109,28 @@ case class CarbonAlterTableDropHivePartitionCommand(
ifExists,
purge,
retainData).run(sparkSession)
+
+ // move the partition files to trash folder which are dropped
+ val droppedPartitionNames = partitions.map { partition =>
+ partition.spec.map { specs => specs._1 + "=" + specs._2}
+ }
+ val droppedPartitionName = droppedPartitionNames
Review Comment:
Taking Head Option to create Partition folder name in Trash is wrong here.
For example, consider a case, where user performs drop multi-level partititon
(i.e., `DROP PARTITION(deptname='Learning',doj='2015-12-01 00:00:00' )`) in
this case, Trash folder also should have multi-level partition folder, to roll
back drop partition if user wants. So, in TRash, please copy under respective
multi-level partition folders
##########
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala:
##########
@@ -494,9 +494,8 @@ test("Creation of partition table should fail if the
colname in table schema and
sql(s"""ALTER TABLE staticpartitionlocloadother_new DROP
PARTITION(empname='ravi')""")
checkAnswer(sql(s"select count(deptname) from
staticpartitionlocloadother_new"), Seq(Row(10)))
sql(s"""alter table staticpartitionlocloadother_new add partition
(empname='ravi') location '$location'""")
- checkAnswer(sql(s"select count(deptname) from
staticpartitionlocloadother_new"), Seq(Row(20)))
sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE
staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'=
',', 'QUOTECHAR'= '"')""")
- checkAnswer(sql(s"select count(deptname) from
staticpartitionlocloadother_new"), Seq(Row(30)))
+ checkAnswer(sql(s"select count(deptname) from
staticpartitionlocloadother_new"), Seq(Row(20)))
Review Comment:
why this change ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]