This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ace4e60  [CARBONDATA-3754]Clean up the data file and index files after 
SI rebuild
ace4e60 is described below

commit ace4e60e3f3af1ead7744607ffbb78385d59a598
Author: akashrn5 <akashnilu...@gmail.com>
AuthorDate: Tue Mar 24 15:17:18 2020 +0530

    [CARBONDATA-3754]Clean up the data file and index files after SI rebuild
    
    Why is this PR needed?
    Clean up not happening for the data file and index files after SI rebuild
    
    What changes were proposed in this PR?
    every task should clear the old data and index files once task finishes.
    
    This closes #3676
---
 .../CarbonDataFileMergeTestCaseOnSI.scala          |  7 ------
 .../secondaryindex/rdd/CarbonSIRebuildRDD.scala    | 27 ++++++++++++++++++++--
 2 files changed, 25 insertions(+), 9 deletions(-)

diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index b6e3360..9eced78 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -81,7 +81,6 @@ class CarbonDataFileMergeTestCaseOnSI
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge 
OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     val rows = sql("""Select count(*) from indexmerge where 
name='n164419'""").collect()
-    sql("clean files for table indexmerge_index1")
     checkAnswer(sql("""Select count(*) from indexmerge where 
name='n164419'"""), rows)
     assert(getDataFileCount("indexmerge_index1", "0") < 7)
   }
@@ -108,7 +107,6 @@ class CarbonDataFileMergeTestCaseOnSI
       .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
     sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect()
     checkAnswer(sql("""Select count(*) from nonindexmerge where 
name='n164419'"""), rows)
-    sql("clean files for table nonindexmerge_index1")
     assert(getDataFileCount("nonindexmerge_index1", "0") < 7)
     assert(getDataFileCount("nonindexmerge_index1", "1") < 7)
     checkAnswer(sql("""Select count(*) from nonindexmerge where 
name='n164419'"""), rows)
@@ -136,14 +134,11 @@ class CarbonDataFileMergeTestCaseOnSI
       .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
     sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE 
SEGMENT.ID IN(0)").collect()
     checkAnswer(sql("""Select count(*) from nonindexmerge where 
name='n164419'"""), rows)
-    sql("clean files for table nonindexmerge_index2")
     assert(getDataFileCount("nonindexmerge_index2", "0") < 7)
     assert(getDataFileCount("nonindexmerge_index2", "1") == 100)
     sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE 
SEGMENT.ID IN(1)").collect()
     checkAnswer(sql("""Select count(*) from nonindexmerge where 
name='n164419'"""), rows)
-    sql("clean files for table nonindexmerge_index2")
     assert(getDataFileCount("nonindexmerge_index2", "1") < 7)
-    sql("clean files for table nonindexmerge_index2")
     checkAnswer(sql("""Select count(*) from nonindexmerge where 
name='n164419'"""), rows)
   }
 
@@ -192,7 +187,6 @@ class CarbonDataFileMergeTestCaseOnSI
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    sql("clean files for table nonindexmerge_index3")
     assert(getDataFileCount("nonindexmerge_index3", "0.1") < 11)
     checkAnswer(sql("""Select count(*) from nonindexmerge where 
name='n164419'"""), rows)
     CarbonProperties.getInstance()
@@ -224,7 +218,6 @@ class CarbonDataFileMergeTestCaseOnSI
     sql(
     "CREATE INDEX nonindexmerge_index4 on table nonindexmerge (name) AS 
'carbondata' " +
     "properties('table_blocksize'='1')")
-    sql("clean files for table nonindexmerge_index4")
     assert(getDataFileCount("nonindexmerge_index4", "0.2") < 15)
     checkAnswer(sql("""Select count(*) from nonindexmerge where 
name='n164419'"""), rows)
     CarbonProperties.getInstance()
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
index 05c6e96..2399a45 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
@@ -37,12 +37,15 @@ import 
org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, 
SortScopeOptions}
 import org.apache.carbondata.core.datastore.block.{SegmentProperties, 
TaskBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -252,7 +255,7 @@ class CarbonSIRebuildRDD[K, V](
 
         // add task completion listener to clean up the resources
         context.addTaskCompletionListener { _ =>
-          close()
+          close(splitList)
         }
         try {
           // fire a query and get the results.
@@ -308,7 +311,7 @@ class CarbonSIRebuildRDD[K, V](
           throw e
       }
 
-      private def close(): Unit = {
+      private def close(splits: util.List[CarbonInputSplit]): Unit = {
         deleteLocalDataFolders()
         // close all the query executor service and clean up memory acquired 
during query processing
         if (null != exec) {
@@ -321,6 +324,26 @@ class CarbonSIRebuildRDD[K, V](
           LOGGER.info("Closing compaction processor instance to clean up 
loading resources")
           processor.close()
         }
+
+        // delete all the old data files which are used for merging
+        splits.asScala.foreach { split =>
+          val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
+          carbonFile.delete()
+        }
+
+        // delete the indexfile/merge index carbonFile of old data files
+        val segmentPath = 
FileFactory.getCarbonFile(indexTable.getSegmentPath(segmentId))
+        val indexFiles = segmentPath.listFiles(new CarbonFileFilter {
+          override def accept(carbonFile: CarbonFile): Boolean = {
+            (carbonFile.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+             
carbonFile.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) &&
+            
DataFileUtil.getTimeStampFromFileName(carbonFile.getAbsolutePath).toLong <
+            carbonLoadModelCopy.getFactTimeStamp
+          }
+        })
+        indexFiles.foreach { indexFile =>
+          indexFile.delete()
+        }
       }
 
       private def deleteLocalDataFolders(): Unit = {

Reply via email to