vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r513204008



##########
File path: 
integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/cleanfiles/TestCleanFileCommand.scala
##########
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.cleanfiles
+
+import java.io.{File, PrintWriter}
+
+import scala.io.Source
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
+class TestCleanFileCommand extends QueryTest with BeforeAndAfterAll {
+
+  var count = 0
+
+  test("clean up table and test trash folder with In Progress segments") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    // run a select query before deletion
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(3)))
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val tableStatusFilePath = path + CarbonCommonConstants.FILE_SEPARATOR + 
"Metadata" +
+      CarbonCommonConstants.FILE_SEPARATOR + "tableStatus"
+    editTableStatusFile(path)
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+    // dry run shows 3 segments to move to trash
+    assert(dryRun == 3)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(0)))
+    assert(FileFactory.isFileExist(trashFolderPath))
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 6)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 6)
+
+
+    val timeStamp = getTimestampFolderName(trashFolderPath)
+
+    // recovering data from trash folder
+    sql(
+      """
+        | CREATE TABLE cleantest1 (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    val segment0Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ '0'
+    val segment1Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ '1'
+    val segment2Path = trashFolderPath + CarbonCommonConstants.FILE_SEPARATOR 
+ timeStamp +
+      CarbonCommonConstants.FILE_SEPARATOR + CarbonCommonConstants.LOAD_FOLDER 
+ '2'
+
+    sql(s"alter table cleantest1 add segment options('path'='$segment0Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment1Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment2Path'," +
+      s"'format'='carbon')").show()
+    sql(s"""INSERT INTO CLEANTEST SELECT * from cleantest1""")
+
+    // test after recovering data from trash
+    checkAnswer(sql(s"""select count(*) from cleantest"""),
+      Seq(Row(3)))
+
+    sql(s"CLEAN FILES FOR TABLE cleantest options('force'='true')").show
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up maintable table and test trash folder with SI with IN 
PROGRESS segments") {
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST_WITHSI""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE CLEANTEST_WITHSI (id Int, name String, add String )
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST_WITHSI SELECT 1,"abc","def"""")
+    sql(s"""INSERT INTO CLEANTEST_WITHSI SELECT 2, "abc","def"""")
+    sql(s"""INSERT INTO CLEANTEST_WITHSI SELECT 3, "abc","def"""")
+
+    sql(s"""CREATE INDEX SI_CLEANTEST on cleantest_withSI(add) as 'carbondata' 
""")
+
+    checkAnswer(sql(s"""select count(*) from cleantest_withSI"""),
+      Seq(Row(3)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""),
+      Seq(Row(3)))
+
+    val mainTablePath = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest_withsi")(sqlContext
+      .sparkSession).getTablePath
+    editTableStatusFile(mainTablePath)
+    val mainTableTrashFolderPath = mainTablePath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+
+    val siTablePath = CarbonEnv.getCarbonTable(Some("default"), 
"si_cleantest")(sqlContext
+      .sparkSession).getTablePath
+    editTableStatusFile(siTablePath)
+    val siTableTrashFolderPath = siTablePath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+
+    assert(!FileFactory.isFileExist(mainTableTrashFolderPath))
+    assert(!FileFactory.isFileExist(siTableTrashFolderPath))
+
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest_withsi 
OPTIONS('isDryRun'='true')").count()
+    // dry run shows 6 segments to move to trash. 3 for main table, 3 for si 
table
+    assert(dryRun == 6)
+
+    sql(s"CLEAN FILES FOR TABLE CLEANTEST_WITHSI").show()
+
+    checkAnswer(sql(s"""select count(*) from cleantest_withSI"""), Seq(Row(0)))
+    checkAnswer(sql(s"""select count(*) from si_cleantest"""), Seq(Row(0)))
+
+    assert(FileFactory.isFileExist(mainTableTrashFolderPath))
+    assert(FileFactory.isFileExist(siTableTrashFolderPath))
+
+    count = 0
+    var listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 6)
+
+    count = 0
+    var listSITable = getFileCountInTrashFolder(siTableTrashFolderPath)
+    assert(listSITable == 6)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest_withsi 
OPTIONS('isDryRun'='true')").count()
+    // dry run shows 6 segments to move to trash. 3 for main table, 3 for si 
table
+    assert(dryRun1 == 6)
+    // recovering data from trash folder
+
+    val timeStamp = getTimestampFolderName(mainTableTrashFolderPath)
+
+
+    sql(
+      """
+        | CREATE TABLE cleantest1 (id Int, name String, add String )
+        | STORED AS carbondata
+      """.stripMargin)
+
+    val segment0Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '0'
+    val segment1Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '1'
+    val segment2Path = mainTableTrashFolderPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+      timeStamp + CarbonCommonConstants.FILE_SEPARATOR + 
CarbonCommonConstants.LOAD_FOLDER + '2'
+
+    sql(s"alter table cleantest1 add segment options('path'='$segment0Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment1Path'," +
+      s"'format'='carbon')").show()
+    sql(s"alter table cleantest1 add segment options('path'='$segment2Path'," +
+      s"'format'='carbon')").show()
+    sql(s"""INSERT INTO CLEANTEST_withsi SELECT * from cleantest1""")
+
+    checkAnswer(sql(s"""select count(*) from cleantest_withSI"""),
+      Seq(Row(3)))
+
+
+    sql(s"CLEAN FILES FOR TABLE cleantest_withsi options('force'='true')").show
+
+    // no files in trash anymore
+    count = 0
+    listMainTable = getFileCountInTrashFolder(mainTableTrashFolderPath)
+    assert(listMainTable == 0)
+
+    count = 0
+    listSITable = getFileCountInTrashFolder(siTableTrashFolderPath)
+    assert(listSITable == 0)
+
+    sql("show segments for table cleantest_withsi").show()
+    sql("show segments for table si_cleantest").show()
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST_WITHSI""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+
+  }
+
+
+  test("clean up table and test trash folder with Marked For Delete segments") 
{
+    // do not send MFD folders to trash
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"""Delete from table cleantest where segment.id in(1)""")
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+    // dry run shows 1 Marked Fro DElete segments to be deleted
+    assert(dryRun == 1)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up table and test trash folder with compaction") {
+    // do not send compacted folders to trash
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""ALTER TABLE CLEANTEST COMPACT "MINOR" """)
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+    // dry run shows 4 compacted segments to be deleted
+    assert(dryRun == 4)
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up table and test trash folder with stale segments") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+    sql(
+      """
+        | CREATE TABLE cleantest (name String, id Int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 1""")
+    sql(s"""INSERT INTO CLEANTEST SELECT "abc", 2""")
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // First 3 segments are made as stale segments, they should be moved to 
the trash folder
+    createStaleSegments(path)
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+   // sql(s"""INSERT INTO CLEANTEST SELECT "abc", 3""")
+    assert(dryRun == 3)
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 6)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+    assert(dryRun1 == 3)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+
+  test("clean up table and test trash folder with partition table") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+
+    sql(
+      """
+        | CREATE TABLE CLEANTEST (id Int, id1 INT ) PARTITIONED BY (add String)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+
+    editTableStatusFile(path)
+
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+    // dry run shows 4 segments to move to trash
+    assert(dryRun == 4)
+
+
+    sql(s"CLEAN FILES FOR TABLE cleantest").show
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+    // dry run shows 4 segments to move to trash
+    assert(dryRun1 == 4)
+
+    // try recovering data and do select count(*)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    // no carbondata file is in the trash now, everything has been deleted
+    list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 0)
+
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+  }
+
+
+
+
+  test("clean up table and test trash folder with stale segments in partition 
table") {
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+
+    sql(
+      """
+        | CREATE TABLE CLEANTEST (id Int, id1 INT ) PARTITIONED BY (add String)
+        | STORED AS carbondata
+      """.stripMargin)
+
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"abc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+    sql(s"""INSERT INTO CLEANTEST SELECT 1, 2,"adc"""")
+
+    val path = CarbonEnv.getCarbonTable(Some("default"), 
"cleantest")(sqlContext.sparkSession)
+      .getTablePath
+    val trashFolderPath = path + CarbonCommonConstants.FILE_SEPARATOR +
+      CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    // All 4  segments are made as stale segments, they should be moved to the 
trash folder
+    createStaleSegments(path)
+
+    val dryRun = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+    // dry run shows 3 segments to move to trash
+    assert(dryRun == 3)
+
+    assert(!FileFactory.isFileExist(trashFolderPath))
+    sql(s"CLEAN FILES FOR TABLE cleantest").show()
+    count = 0
+    var list = getFileCountInTrashFolder(trashFolderPath)
+    assert(list == 8)
+
+    val dryRun1 = sql(s"CLEAN FILES FOR TABLE cleantest 
OPTIONS('isDryRun'='true')").count()
+    // dry run shows 3 segments to move to trash
+    assert(dryRun1 == 4)
+
+    sql(s"CLEAN FILES FOR TABLE cleantest OPTIONS('force'='true')").show()
+    count = 0
+    list = getFileCountInTrashFolder(trashFolderPath)
+    // no carbondata file is added to the trash
+    assert(list == 0)
+    sql("""DROP TABLE IF EXISTS CLEANTEST""")
+    sql("""DROP TABLE IF EXISTS CLEANTEST1""")
+  }
+
+

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to