QiangCai commented on a change in pull request #3986: URL: https://github.com/apache/carbondata/pull/3986#discussion_r513147277
########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ########## @@ -415,44 +415,41 @@ public boolean accept(CarbonFile pathName) { } /** - * Return all delta file for a block. - * @param segmentId - * @param blockName - * @return + * Get all delete delta files of the block of specified segment. + * Actually, delete delta file name is generated from each SegmentUpdateDetails. + * + * @param seg the segment which is to find block and its delete delta files + * @param blockName the specified block of the segment + * @return delete delta file list of the block */ - public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) { + public List<String> getDeleteDeltaFilesList(final Segment seg, final String blockName) { + List<String> deleteDeltaFileList = new ArrayList<>(); String segmentPath = CarbonTablePath.getSegmentPath( - identifier.getTablePath(), segmentId.getSegmentNo()); - CarbonFile segDir = - FileFactory.getCarbonFile(segmentPath); + identifier.getTablePath(), seg.getSegmentNo()); + for (SegmentUpdateDetails block : updateDetails) { if ((block.getBlockName().equalsIgnoreCase(blockName)) && - (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo())) - && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) { - final long deltaStartTimestamp = - getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); - final long deltaEndTimeStamp = - getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); - - return segDir.listFiles(new CarbonFileFilter() { - - @Override - public boolean accept(CarbonFile pathName) { - String fileName = pathName.getName(); - if (pathName.getSize() > 0 - && fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) { - String blkName = fileName.substring(0, fileName.lastIndexOf("-")); - long timestamp = - Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName)); - return blockName.equals(blkName) && timestamp <= deltaEndTimeStamp - && timestamp >= deltaStartTimestamp; - } - return false; - } - }); + (block.getSegmentName().equalsIgnoreCase(seg.getSegmentNo())) && + !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) { + Set<String> deltaFileTimestamps = block.getDeltaFileStamps(); + String deleteDeltaFilePrefix = segmentPath + CarbonCommonConstants.FILE_SEPARATOR + + blockName + CarbonCommonConstants.HYPHEN; + if (deltaFileTimestamps != null && deltaFileTimestamps.size() > 0) { Review comment: please add test case: 1. set CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE to false 2. update different row in same file three times (it should generate three delete delta files for same blockname) 3. set CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE to true 4. update different row in same file again 5. query table to check result ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ########## @@ -415,44 +415,41 @@ public boolean accept(CarbonFile pathName) { } /** - * Return all delta file for a block. - * @param segmentId - * @param blockName - * @return + * Get all delete delta files of the block of specified segment. + * Actually, delete delta file name is generated from each SegmentUpdateDetails. + * + * @param seg the segment which is to find block and its delete delta files + * @param blockName the specified block of the segment + * @return delete delta file list of the block */ - public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) { + public List<String> getDeleteDeltaFilesList(final Segment seg, final String blockName) { + List<String> deleteDeltaFileList = new ArrayList<>(); String segmentPath = CarbonTablePath.getSegmentPath( - identifier.getTablePath(), segmentId.getSegmentNo()); - CarbonFile segDir = - FileFactory.getCarbonFile(segmentPath); + identifier.getTablePath(), seg.getSegmentNo()); + for (SegmentUpdateDetails block : updateDetails) { if ((block.getBlockName().equalsIgnoreCase(blockName)) && - (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo())) - && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) { - final long deltaStartTimestamp = - getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); - final long deltaEndTimeStamp = - getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block); - - return segDir.listFiles(new CarbonFileFilter() { - - @Override - public boolean accept(CarbonFile pathName) { - String fileName = pathName.getName(); - if (pathName.getSize() > 0 - && fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) { - String blkName = fileName.substring(0, fileName.lastIndexOf("-")); - long timestamp = - Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName)); - return blockName.equals(blkName) && timestamp <= deltaEndTimeStamp - && timestamp >= deltaStartTimestamp; - } - return false; - } - }); + (block.getSegmentName().equalsIgnoreCase(seg.getSegmentNo())) && + !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) { + Set<String> deltaFileTimestamps = block.getDeltaFileStamps(); + String deleteDeltaFilePrefix = segmentPath + CarbonCommonConstants.FILE_SEPARATOR + + blockName + CarbonCommonConstants.HYPHEN; + if (deltaFileTimestamps != null && deltaFileTimestamps.size() > 0) { + deltaFileTimestamps.forEach(timestamp -> deleteDeltaFileList.add( + deleteDeltaFilePrefix + timestamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT)); Review comment: better to reuse CarbonUpdateUtil.getDeleteDeltaFilePath ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala ########## @@ -177,6 +178,7 @@ object HorizontalCompaction { absTableIdentifier, segmentUpdateStatusManager, compactionTypeIUD) + LOG.debug(s"The segment list for Horizontal Update Compaction is $deletedBlocksList") Review comment: same above ########## File path: core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ########## @@ -415,44 +415,41 @@ public boolean accept(CarbonFile pathName) { } /** - * Return all delta file for a block. - * @param segmentId - * @param blockName - * @return + * Get all delete delta files of the block of specified segment. + * Actually, delete delta file name is generated from each SegmentUpdateDetails. + * + * @param seg the segment which is to find block and its delete delta files + * @param blockName the specified block of the segment + * @return delete delta file list of the block */ - public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) { + public List<String> getDeleteDeltaFilesList(final Segment seg, final String blockName) { Review comment: change seg to segment ########## File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala ########## @@ -130,6 +130,7 @@ object HorizontalCompaction { absTableIdentifier, segmentUpdateStatusManager, compactionTypeIUD) + LOG.debug(s"The segment list for Horizontal Update Compaction is $validSegList") Review comment: avoid converting the list to string if (LOG.isDebugEnabled) { ... } ########## File path: processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ########## @@ -1138,73 +1126,43 @@ private static Boolean checkUpdateDeltaFilesInSeg(Segment seg, } /** - * Check is the segment passed qualifies for IUD delete delta compaction or not i.e. - * if the number of delete delta files present in the segment is more than - * numberDeltaFilesThreshold. + * Check whether the segment passed qualifies for IUD delete delta compaction or not, + * i.e., if the number of delete delta files present in the segment is more than + * numberDeltaFilesThreshold, this segment will be selected. * - * @param seg - * @param segmentUpdateStatusManager - * @param numberDeltaFilesThreshold - * @return + * @param seg segment to be qualified + * @param segmentUpdateStatusManager segments & blocks details management + * @param numberDeltaFilesThreshold threshold of delete delta files + * @return block list of the segment */ - private static boolean checkDeleteDeltaFilesInSeg(Segment seg, + private static List<String> checkDeleteDeltaFilesInSeg(Segment seg, SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { + List<String> blockLists = new ArrayList<>(); Set<String> uniqueBlocks = new HashSet<String>(); List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo()); - - for (final String blockName : blockNameList) { - - CarbonFile[] deleteDeltaFiles = + for (String blockName : blockNameList) { + List<String> deleteDeltaFiles = segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName); - if (null != deleteDeltaFiles) { + if (null != deleteDeltaFiles && deleteDeltaFiles.size() > numberDeltaFilesThreshold) { Review comment: add test case for case when CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION >= 3 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org