QiangCai commented on a change in pull request #3986:
URL: https://github.com/apache/carbondata/pull/3986#discussion_r513147277



##########
File path: 
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
##########
@@ -415,44 +415,41 @@ public boolean accept(CarbonFile pathName) {
   }
 
   /**
-   * Return all delta file for a block.
-   * @param segmentId
-   * @param blockName
-   * @return
+   * Get all delete delta files of the block of specified segment.
+   * Actually, delete delta file name is generated from each 
SegmentUpdateDetails.
+   *
+   * @param seg the segment which is to find block and its delete delta files
+   * @param blockName the specified block of the segment
+   * @return delete delta file list of the block
    */
-  public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final 
String blockName) {
+  public List<String> getDeleteDeltaFilesList(final Segment seg, final String 
blockName) {
+    List<String> deleteDeltaFileList = new ArrayList<>();
     String segmentPath = CarbonTablePath.getSegmentPath(
-        identifier.getTablePath(), segmentId.getSegmentNo());
-    CarbonFile segDir =
-        FileFactory.getCarbonFile(segmentPath);
+        identifier.getTablePath(), seg.getSegmentNo());
+
     for (SegmentUpdateDetails block : updateDetails) {
       if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
-          (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
-          && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
-        final long deltaStartTimestamp =
-            
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
-        final long deltaEndTimeStamp =
-            getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, 
block);
-
-        return segDir.listFiles(new CarbonFileFilter() {
-
-          @Override
-          public boolean accept(CarbonFile pathName) {
-            String fileName = pathName.getName();
-            if (pathName.getSize() > 0
-                && 
fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
-              String blkName = fileName.substring(0, 
fileName.lastIndexOf("-"));
-              long timestamp =
-                  
Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));
-              return blockName.equals(blkName) && timestamp <= 
deltaEndTimeStamp
-                  && timestamp >= deltaStartTimestamp;
-            }
-            return false;
-          }
-        });
+          (block.getSegmentName().equalsIgnoreCase(seg.getSegmentNo())) &&
+          !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
+        Set<String> deltaFileTimestamps = block.getDeltaFileStamps();
+        String deleteDeltaFilePrefix = segmentPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+            blockName + CarbonCommonConstants.HYPHEN;
+        if (deltaFileTimestamps != null && deltaFileTimestamps.size() > 0) {

Review comment:
       please add test case:
   1.  set CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE to false
   2. update different row in same file three times (it should generate three 
delete delta files for same blockname) 
   3. set CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE to true
   4. update different row in same file again
   5. query table to check result

##########
File path: 
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
##########
@@ -415,44 +415,41 @@ public boolean accept(CarbonFile pathName) {
   }
 
   /**
-   * Return all delta file for a block.
-   * @param segmentId
-   * @param blockName
-   * @return
+   * Get all delete delta files of the block of specified segment.
+   * Actually, delete delta file name is generated from each 
SegmentUpdateDetails.
+   *
+   * @param seg the segment which is to find block and its delete delta files
+   * @param blockName the specified block of the segment
+   * @return delete delta file list of the block
    */
-  public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final 
String blockName) {
+  public List<String> getDeleteDeltaFilesList(final Segment seg, final String 
blockName) {
+    List<String> deleteDeltaFileList = new ArrayList<>();
     String segmentPath = CarbonTablePath.getSegmentPath(
-        identifier.getTablePath(), segmentId.getSegmentNo());
-    CarbonFile segDir =
-        FileFactory.getCarbonFile(segmentPath);
+        identifier.getTablePath(), seg.getSegmentNo());
+
     for (SegmentUpdateDetails block : updateDetails) {
       if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
-          (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))
-          && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
-        final long deltaStartTimestamp =
-            
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
-        final long deltaEndTimeStamp =
-            getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, 
block);
-
-        return segDir.listFiles(new CarbonFileFilter() {
-
-          @Override
-          public boolean accept(CarbonFile pathName) {
-            String fileName = pathName.getName();
-            if (pathName.getSize() > 0
-                && 
fileName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)) {
-              String blkName = fileName.substring(0, 
fileName.lastIndexOf("-"));
-              long timestamp =
-                  
Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));
-              return blockName.equals(blkName) && timestamp <= 
deltaEndTimeStamp
-                  && timestamp >= deltaStartTimestamp;
-            }
-            return false;
-          }
-        });
+          (block.getSegmentName().equalsIgnoreCase(seg.getSegmentNo())) &&
+          !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
+        Set<String> deltaFileTimestamps = block.getDeltaFileStamps();
+        String deleteDeltaFilePrefix = segmentPath + 
CarbonCommonConstants.FILE_SEPARATOR +
+            blockName + CarbonCommonConstants.HYPHEN;
+        if (deltaFileTimestamps != null && deltaFileTimestamps.size() > 0) {
+          deltaFileTimestamps.forEach(timestamp -> deleteDeltaFileList.add(
+              deleteDeltaFilePrefix + timestamp + 
CarbonCommonConstants.DELETE_DELTA_FILE_EXT));

Review comment:
       better to reuse CarbonUpdateUtil.getDeleteDeltaFilePath
   

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
##########
@@ -177,6 +178,7 @@ object HorizontalCompaction {
       absTableIdentifier,
       segmentUpdateStatusManager,
       compactionTypeIUD)
+    LOG.debug(s"The segment list for Horizontal Update Compaction is 
$deletedBlocksList")

Review comment:
       same above

##########
File path: 
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
##########
@@ -415,44 +415,41 @@ public boolean accept(CarbonFile pathName) {
   }
 
   /**
-   * Return all delta file for a block.
-   * @param segmentId
-   * @param blockName
-   * @return
+   * Get all delete delta files of the block of specified segment.
+   * Actually, delete delta file name is generated from each 
SegmentUpdateDetails.
+   *
+   * @param seg the segment which is to find block and its delete delta files
+   * @param blockName the specified block of the segment
+   * @return delete delta file list of the block
    */
-  public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final 
String blockName) {
+  public List<String> getDeleteDeltaFilesList(final Segment seg, final String 
blockName) {

Review comment:
       change seg to segment

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
##########
@@ -130,6 +130,7 @@ object HorizontalCompaction {
       absTableIdentifier,
       segmentUpdateStatusManager,
       compactionTypeIUD)
+    LOG.debug(s"The segment list for Horizontal Update Compaction is 
$validSegList")

Review comment:
       avoid converting the list to string
   if (LOG.isDebugEnabled) {
     ...
   }

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
##########
@@ -1138,73 +1126,43 @@ private static Boolean 
checkUpdateDeltaFilesInSeg(Segment seg,
   }
 
   /**
-   * Check is the segment passed qualifies for IUD delete delta compaction or 
not i.e.
-   * if the number of delete delta files present in the segment is more than
-   * numberDeltaFilesThreshold.
+   * Check whether the segment passed qualifies for IUD delete delta 
compaction or not,
+   * i.e., if the number of delete delta files present in the segment is more 
than
+   * numberDeltaFilesThreshold, this segment will be selected.
    *
-   * @param seg
-   * @param segmentUpdateStatusManager
-   * @param numberDeltaFilesThreshold
-   * @return
+   * @param seg segment to be qualified
+   * @param segmentUpdateStatusManager segments & blocks details management
+   * @param numberDeltaFilesThreshold threshold of delete delta files
+   * @return block list of the segment
    */
-  private static boolean checkDeleteDeltaFilesInSeg(Segment seg,
+  private static List<String> checkDeleteDeltaFilesInSeg(Segment seg,
       SegmentUpdateStatusManager segmentUpdateStatusManager, int 
numberDeltaFilesThreshold) {
 
+    List<String> blockLists = new ArrayList<>();
     Set<String> uniqueBlocks = new HashSet<String>();
     List<String> blockNameList =
         segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
-
-    for (final String blockName : blockNameList) {
-
-      CarbonFile[] deleteDeltaFiles =
+    for (String blockName : blockNameList) {
+      List<String> deleteDeltaFiles =
           segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
-      if (null != deleteDeltaFiles) {
+      if (null != deleteDeltaFiles && deleteDeltaFiles.size() > 
numberDeltaFilesThreshold) {

Review comment:
       add test case for case when 
CarbonCommonConstants.DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION  >= 3




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to