This is an automated email from the ASF dual-hosted git repository. kunalkapoor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new ace4e60 [CARBONDATA-3754]Clean up the data file and index files after SI rebuild ace4e60 is described below commit ace4e60e3f3af1ead7744607ffbb78385d59a598 Author: akashrn5 <akashnilu...@gmail.com> AuthorDate: Tue Mar 24 15:17:18 2020 +0530 [CARBONDATA-3754]Clean up the data file and index files after SI rebuild Why is this PR needed? Clean up not happening for the data file and index files after SI rebuild What changes were proposed in this PR? every task should clear the old data and index files once task finishes. This closes #3676 --- .../CarbonDataFileMergeTestCaseOnSI.scala | 7 ------ .../secondaryindex/rdd/CarbonSIRebuildRDD.scala | 27 ++++++++++++++++++++-- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala index b6e3360..9eced78 100644 --- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala +++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala @@ -81,7 +81,6 @@ class CarbonDataFileMergeTestCaseOnSI sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " + s"'GLOBAL_SORT_PARTITIONS'='100')") val rows = sql("""Select count(*) from indexmerge where name='n164419'""").collect() - sql("clean files for table indexmerge_index1") checkAnswer(sql("""Select count(*) from indexmerge where name='n164419'"""), rows) assert(getDataFileCount("indexmerge_index1", "0") < 7) } @@ -108,7 +107,6 @@ class CarbonDataFileMergeTestCaseOnSI .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true") sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect() checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows) - sql("clean files for table nonindexmerge_index1") assert(getDataFileCount("nonindexmerge_index1", "0") < 7) assert(getDataFileCount("nonindexmerge_index1", "1") < 7) checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows) @@ -136,14 +134,11 @@ class CarbonDataFileMergeTestCaseOnSI .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true") sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE SEGMENT.ID IN(0)").collect() checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows) - sql("clean files for table nonindexmerge_index2") assert(getDataFileCount("nonindexmerge_index2", "0") < 7) assert(getDataFileCount("nonindexmerge_index2", "1") == 100) sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE SEGMENT.ID IN(1)").collect() checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows) - sql("clean files for table nonindexmerge_index2") assert(getDataFileCount("nonindexmerge_index2", "1") < 7) - sql("clean files for table nonindexmerge_index2") checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows) } @@ -192,7 +187,6 @@ class CarbonDataFileMergeTestCaseOnSI CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true") sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect() - sql("clean files for table nonindexmerge_index3") assert(getDataFileCount("nonindexmerge_index3", "0.1") < 11) checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows) CarbonProperties.getInstance() @@ -224,7 +218,6 @@ class CarbonDataFileMergeTestCaseOnSI sql( "CREATE INDEX nonindexmerge_index4 on table nonindexmerge (name) AS 'carbondata' " + "properties('table_blocksize'='1')") - sql("clean files for table nonindexmerge_index4") assert(getDataFileCount("nonindexmerge_index4", "0.2") < 15) checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows) CarbonProperties.getInstance() diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala index 05c6e96..2399a45 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala @@ -37,12 +37,15 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.converter.SparkDataTypeConverterImpl import org.apache.carbondata.core.constants.{CarbonCommonConstants, SortScopeOptions} import org.apache.carbondata.core.datastore.block.{SegmentProperties, TaskBlockInfo} +import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter} import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.metadata.blocklet.DataFileFooter import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil} +import org.apache.carbondata.core.util.path.CarbonTablePath +import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit} import org.apache.carbondata.hadoop.api.CarbonInputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil @@ -252,7 +255,7 @@ class CarbonSIRebuildRDD[K, V]( // add task completion listener to clean up the resources context.addTaskCompletionListener { _ => - close() + close(splitList) } try { // fire a query and get the results. @@ -308,7 +311,7 @@ class CarbonSIRebuildRDD[K, V]( throw e } - private def close(): Unit = { + private def close(splits: util.List[CarbonInputSplit]): Unit = { deleteLocalDataFolders() // close all the query executor service and clean up memory acquired during query processing if (null != exec) { @@ -321,6 +324,26 @@ class CarbonSIRebuildRDD[K, V]( LOGGER.info("Closing compaction processor instance to clean up loading resources") processor.close() } + + // delete all the old data files which are used for merging + splits.asScala.foreach { split => + val carbonFile = FileFactory.getCarbonFile(split.getFilePath) + carbonFile.delete() + } + + // delete the indexfile/merge index carbonFile of old data files + val segmentPath = FileFactory.getCarbonFile(indexTable.getSegmentPath(segmentId)) + val indexFiles = segmentPath.listFiles(new CarbonFileFilter { + override def accept(carbonFile: CarbonFile): Boolean = { + (carbonFile.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) || + carbonFile.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && + DataFileUtil.getTimeStampFromFileName(carbonFile.getAbsolutePath).toLong < + carbonLoadModelCopy.getFactTimeStamp + } + }) + indexFiles.foreach { indexFile => + indexFile.delete() + } } private def deleteLocalDataFolders(): Unit = {