satishkotha commented on a change in pull request #2422:
URL: https://github.com/apache/hudi/pull/2422#discussion_r554244115



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -92,13 +95,36 @@
       case HoodieTimeline.SAVEPOINT_ACTION:
         // Nothing to be done here
         break;
+      case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        HoodieReplaceCommitMetadata replaceMetadata = 
TimelineMetadataUtils.deserializeHoodieReplaceMetadata(

Review comment:
       I think this is in json format in active timeline. (Only archival uses 
avro format similar to other commits). Can you double check while adding tests? 
You may need similar change in CleanPlanner

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -370,6 +376,59 @@ private String 
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
     return earliestCommitToRetain;
   }
 
+  public Map<String, List<String>> 
getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {

Review comment:
       fileSystemView#getReplacedFileGroupsBeforeOrOn looks similar (we may 
have to add another method 'getReplacedFileGroupsBefore' to enforce strict 
inequality). Maybe we can reuse code?  I can do this later on if we file code 
cleanup task.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -370,6 +376,59 @@ private String 
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
     return earliestCommitToRetain;
   }
 
+  public Map<String, List<String>> 
getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {
+    HoodieCleaningPolicy policy = config.getCleanerPolicy();
+    HoodieTimeline replaceTimeline = 
hoodieTable.getActiveTimeline().getCompletedReplaceTimeline();
+
+    // Determine which replace commits can be cleaned.
+    Stream<HoodieInstant> cleanableReplaceCommits;
+    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+      if (!earliestInstantToRetain.isPresent()) {
+        LOG.info("Not enough instants to start cleaning replace commits");
+        return Collections.emptyMap();
+      }
+      // all replace commits, before the earliest instant we want to retain, 
should be eligible for deleting the
+      // replaced file groups.
+      cleanableReplaceCommits = replaceTimeline
+          .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN,
+              earliestInstantToRetain.get().getTimestamp()))
+          .getInstants();
+    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+      // In this scenario, we will assume that once replaced a file group 
automatically becomes eligible for cleaning completely
+      // In other words, the file versions only apply to the active file 
groups.
+      cleanableReplaceCommits = replaceTimeline.getInstants();
+    } else {
+      throw new IllegalArgumentException("Unknown cleaning policy : " + 
policy.name());
+    }
+
+    // merge everything and make a map full of file ids to be cleaned.
+    return cleanableReplaceCommits.map(instant -> {
+      try {
+        return 
TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds();
+      } catch (IOException e) {
+        throw new HoodieIOException("Unable to deserialize " + instant, e);
+      }
+    }).reduce((leftMap, rightMap) -> {
+      rightMap.forEach((partition, fileIds) -> {
+        if (!leftMap.containsKey(partition)) {
+          leftMap.put(partition, fileIds);
+        } else {
+          // duplicates should nt be possible; since replace of a file group 
should happen once only
+          leftMap.get(partition).addAll(fileIds);
+        }
+      });
+      return leftMap;
+    }).orElse(new HashMap<>());
+  }
+
+  public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String 
partitionPath, List<String> eligibleFileIds) {
+    return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath)

Review comment:
       i think getAllFileGroups doesn't return replaced file groups. Looks like 
we may have to change name to getAllActiveFileGroups to avoid confusion. You 
can make getAllFileGroupsIncludingReplaced public and use it?
   We could also use getReplacedFileGroupsBeforeOrOn (or add new method 
mentioned above) that returns HoodieFileGroups 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -370,6 +376,59 @@ private String 
getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi
     return earliestCommitToRetain;
   }
 
+  public Map<String, List<String>> 
getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) {
+    HoodieCleaningPolicy policy = config.getCleanerPolicy();
+    HoodieTimeline replaceTimeline = 
hoodieTable.getActiveTimeline().getCompletedReplaceTimeline();
+
+    // Determine which replace commits can be cleaned.
+    Stream<HoodieInstant> cleanableReplaceCommits;
+    if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
+      if (!earliestInstantToRetain.isPresent()) {
+        LOG.info("Not enough instants to start cleaning replace commits");
+        return Collections.emptyMap();
+      }
+      // all replace commits, before the earliest instant we want to retain, 
should be eligible for deleting the
+      // replaced file groups.
+      cleanableReplaceCommits = replaceTimeline
+          .filter(instant -> 
HoodieTimeline.compareTimestamps(instant.getTimestamp(), 
HoodieTimeline.LESSER_THAN,
+              earliestInstantToRetain.get().getTimestamp()))
+          .getInstants();
+    } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
+      // In this scenario, we will assume that once replaced a file group 
automatically becomes eligible for cleaning completely
+      // In other words, the file versions only apply to the active file 
groups.
+      cleanableReplaceCommits = replaceTimeline.getInstants();
+    } else {
+      throw new IllegalArgumentException("Unknown cleaning policy : " + 
policy.name());
+    }
+
+    // merge everything and make a map full of file ids to be cleaned.
+    return cleanableReplaceCommits.map(instant -> {
+      try {
+        return 
TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds();
+      } catch (IOException e) {
+        throw new HoodieIOException("Unable to deserialize " + instant, e);
+      }
+    }).reduce((leftMap, rightMap) -> {
+      rightMap.forEach((partition, fileIds) -> {
+        if (!leftMap.containsKey(partition)) {
+          leftMap.put(partition, fileIds);
+        } else {
+          // duplicates should nt be possible; since replace of a file group 
should happen once only
+          leftMap.get(partition).addAll(fileIds);
+        }
+      });
+      return leftMap;
+    }).orElse(new HashMap<>());
+  }
+
+  public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String 
partitionPath, List<String> eligibleFileIds) {
+    return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath)
+        .filter(fg -> 
eligibleFileIds.contains(fg.getFileGroupId().getFileId()))
+        .flatMap(HoodieFileGroup::getAllFileSlices)
+        .flatMap(fileSlice -> getCleanFileInfoForSlice(fileSlice).stream())
+        .collect(Collectors.toList());

Review comment:
       in line 220 we are honoring savepoint files for cleaning regular 
commits. But we might be missing that for replacecommits. is that fine for now?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -92,13 +95,36 @@
       case HoodieTimeline.SAVEPOINT_ACTION:
         // Nothing to be done here
         break;
+      case HoodieTimeline.REPLACE_COMMIT_ACTION:
+        HoodieReplaceCommitMetadata replaceMetadata = 
TimelineMetadataUtils.deserializeHoodieReplaceMetadata(
+            timeline.getInstantDetails(instant).get());
+        records = Option.of(convertMetadataToRecords(replaceMetadata, 
instant.getTimestamp()));
+        break;
       default:
         throw new HoodieException("Unknown type of action " + 
instant.getAction());
     }
 
     return records;
   }
 
+  public static List<HoodieRecord> 
convertMetadataToRecords(HoodieReplaceCommitMetadata replaceCommitMetadata, 
String instantTime) {

Review comment:
       if HoodieReplaceCommitMetadata is json format, we could reuse 
convertMetadataToRecords(HoodieCommitMetadata, String)

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java
##########
@@ -81,9 +83,21 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
 
       context.setJobStatus(this.getClass().getSimpleName(), "Generates list of 
file slices to be cleaned");
 
-      Map<String, List<HoodieCleanFileInfo>> cleanOps = context
-          .map(partitionsToClean, partitionPathToClean -> 
Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), 
cleanerParallelism)
-          .stream()
+      // Compute the file paths, to be cleaned in each valid file group
+      Stream<Pair<String, List<CleanFileInfo>>> cleanInfos = 
context.map(partitionsToClean,
+          partitionPathToClean -> Pair.of(partitionPathToClean, 
planner.getDeletePaths(partitionPathToClean)),

Review comment:
       is it possible to have one call to planner.getDeletePaths return all 
files to be cleaned? that seems like better abstraction at a high level to me. 
Not sure if there are disadvantages of separating them.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
##########
@@ -122,6 +122,7 @@ static HoodieRollbackStat 
mergeRollbackStat(HoodieRollbackStat stat1, HoodieRoll
       List<ListingBasedRollbackRequest> partitionRollbackRequests = new 
ArrayList<>();
       switch (instantToRollback.getAction()) {
         case HoodieTimeline.COMMIT_ACTION:
+        case HoodieTimeline.REPLACE_COMMIT_ACTION:

Review comment:
       Yes, this was missed because MOR is not well tested with 
clustering/insert_overwrite. New test covers this. Btw, There may be other 
places where we might be missing similar handling of REPLACE_COMMIT. I tried 
doing an audit, but its very tedious. So, I may have missed some places. Let me 
know if you find any.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to