[CARBONDATA-2806] Delete delete delta files upon clean files for flat folder
Problem: Delete delta files are not removed after clean files operation. Solution: Get the delta files using Segment Status Manager and remove them during clean operation. This closes #2587 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/af984101 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/af984101 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/af984101 Branch: refs/heads/external-format Commit: af984101ebcd55f71e370e52e837157b45c529dd Parents: a302cd1 Author: ravipesala <ravi.pes...@gmail.com> Authored: Mon Jul 30 20:30:58 2018 +0530 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Aug 1 22:15:48 2018 +0800 ---------------------------------------------------------------------- .../core/metadata/SegmentFileStore.java | 18 ++++++---- .../carbondata/core/util/DeleteLoadFolders.java | 35 +++++++++++++------- .../core/util/path/CarbonTablePath.java | 8 +++++ .../FlatFolderTableLoadingTestCase.scala | 25 ++++++++++++++ 4 files changed, 68 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java index 67e58d1..111e444 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java @@ -56,6 +56,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataFileFooterConverter; import org.apache.carbondata.core.util.path.CarbonTablePath; @@ -793,25 +794,30 @@ public class SegmentFileStore { /** * Deletes the segment file and its physical files like partition folders from disk * @param tablePath - * @param segmentFile + * @param segment * @param partitionSpecs * @throws IOException */ - public static void deleteSegment(String tablePath, String segmentFile, - List<PartitionSpec> partitionSpecs) throws IOException { - SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile); + public static void deleteSegment(String tablePath, Segment segment, + List<PartitionSpec> partitionSpecs, + SegmentUpdateStatusManager updateStatusManager) throws Exception { + SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName()); List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true); Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap(); for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) { FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey())); for (String file : entry.getValue()) { + String[] deltaFilePaths = + updateStatusManager.getDeleteDeltaFilePath(file, segment.getSegmentNo()); + for (String deltaFilePath : deltaFilePaths) { + FileFactory.deleteFile(deltaFilePath, FileFactory.getFileType(deltaFilePath)); + } FileFactory.deleteFile(file, FileFactory.getFileType(file)); } } deletePhysicalPartition(partitionSpecs, indexFilesMap, indexOrMergeFiles, tablePath); String segmentFilePath = - CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR - + segmentFile; + CarbonTablePath.getSegmentFilePath(tablePath, segment.getSegmentFileName()); // Deletes the physical segment file FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath)); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java index 0433ba4..a65294e 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java +++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java @@ -40,6 +40,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; +import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.path.CarbonTablePath; public final class DeleteLoadFolders { @@ -75,21 +76,29 @@ public final class DeleteLoadFolders { absoluteTableIdentifier, currentDetails, isForceDelete, - specs); + specs, + currentDetails); if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) { physicalFactAndMeasureMetadataDeletion( absoluteTableIdentifier, newAddedLoadHistoryList, isForceDelete, - specs); + specs, + currentDetails); } } - public static void physicalFactAndMeasureMetadataDeletion( - AbsoluteTableIdentifier absoluteTableIdentifier, - LoadMetadataDetails[] loadDetails, - boolean isForceDelete, - List<PartitionSpec> specs) { + /** + * Delete the invalid data physically from table. + * @param absoluteTableIdentifier table identifier + * @param loadDetails Load details which need clean up + * @param isForceDelete is Force delete requested by user + * @param specs Partition specs + * @param currLoadDetails Current table status load details which are required for update manager. + */ + private static void physicalFactAndMeasureMetadataDeletion( + AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] loadDetails, + boolean isForceDelete, List<PartitionSpec> specs, LoadMetadataDetails[] currLoadDetails) { CarbonTable carbonTable = DataMapStoreManager.getInstance().getCarbonTable( absoluteTableIdentifier); List<TableDataMap> indexDataMaps = new ArrayList<>(); @@ -104,14 +113,16 @@ public final class DeleteLoadFolders { "Failed to get datamaps for %s.%s, therefore the datamap files could not be cleaned.", absoluteTableIdentifier.getDatabaseName(), absoluteTableIdentifier.getTableName())); } - + SegmentUpdateStatusManager updateStatusManager = + new SegmentUpdateStatusManager(carbonTable, currLoadDetails); for (final LoadMetadataDetails oneLoad : loadDetails) { if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { try { if (oneLoad.getSegmentFile() != null) { - SegmentFileStore - .deleteSegment(absoluteTableIdentifier.getTablePath(), oneLoad.getSegmentFile(), - specs); + SegmentFileStore.deleteSegment( + absoluteTableIdentifier.getTablePath(), + new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()), + specs, updateStatusManager); } else { String path = getSegmentPath(absoluteTableIdentifier, oneLoad); boolean status = false; @@ -161,7 +172,7 @@ public final class DeleteLoadFolders { segments.add(new Segment(oneLoad.getLoadName())); dataMap.deleteDatamapData(segments); } - } catch (IOException e) { + } catch (Exception e) { LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName()); } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 275d3d6..6493e34 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -694,6 +694,14 @@ public class CarbonTablePath { } /** + * Get the segment file path of table + */ + public static String getSegmentFilePath(String tablePath, String segmentFileName) { + return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments" + + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName; + } + + /** * Get the lock files directory */ public static String getLockFilesDirPath(String tablePath) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala index 9a60978..68f8ca7 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala @@ -127,6 +127,31 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table if exists t1") } + test("merge index flat folder and delete delta issue") { + sql("drop table if exists flatfolder_delete") + sql( + """ + | CREATE TABLE flatfolder_delete (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int,empno int) + | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true') + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "flatfolder_delete") + sql(s"""delete from flatfolder_delete where empname='anandh'""") + assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 4) + sql("Alter table flatfolder_delete compact 'minor'") + assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 4) + sql("clean files for table flatfolder_delete") + assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length == 1) + assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 0) + + } + override def afterAll = { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,