satishkotha commented on a change in pull request #2422: URL: https://github.com/apache/hudi/pull/2422#discussion_r554244115
########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -92,13 +95,36 @@ case HoodieTimeline.SAVEPOINT_ACTION: // Nothing to be done here break; + case HoodieTimeline.REPLACE_COMMIT_ACTION: + HoodieReplaceCommitMetadata replaceMetadata = TimelineMetadataUtils.deserializeHoodieReplaceMetadata( Review comment: I think this is in json format in active timeline. (Only archival uses avro format similar to other commits). Can you double check while adding tests? You may need similar change in CleanPlanner ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java ########## @@ -370,6 +376,59 @@ private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi return earliestCommitToRetain; } + public Map<String, List<String>> getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) { Review comment: fileSystemView#getReplacedFileGroupsBeforeOrOn looks similar (we may have to add another method 'getReplacedFileGroupsBefore' to enforce strict inequality). Maybe we can reuse code? I can do this later on if we file code cleanup task. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java ########## @@ -370,6 +376,59 @@ private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi return earliestCommitToRetain; } + public Map<String, List<String>> getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) { + HoodieCleaningPolicy policy = config.getCleanerPolicy(); + HoodieTimeline replaceTimeline = hoodieTable.getActiveTimeline().getCompletedReplaceTimeline(); + + // Determine which replace commits can be cleaned. + Stream<HoodieInstant> cleanableReplaceCommits; + if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { + if (!earliestInstantToRetain.isPresent()) { + LOG.info("Not enough instants to start cleaning replace commits"); + return Collections.emptyMap(); + } + // all replace commits, before the earliest instant we want to retain, should be eligible for deleting the + // replaced file groups. + cleanableReplaceCommits = replaceTimeline + .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, + earliestInstantToRetain.get().getTimestamp())) + .getInstants(); + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { + // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely + // In other words, the file versions only apply to the active file groups. + cleanableReplaceCommits = replaceTimeline.getInstants(); + } else { + throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); + } + + // merge everything and make a map full of file ids to be cleaned. + return cleanableReplaceCommits.map(instant -> { + try { + return TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds(); + } catch (IOException e) { + throw new HoodieIOException("Unable to deserialize " + instant, e); + } + }).reduce((leftMap, rightMap) -> { + rightMap.forEach((partition, fileIds) -> { + if (!leftMap.containsKey(partition)) { + leftMap.put(partition, fileIds); + } else { + // duplicates should nt be possible; since replace of a file group should happen once only + leftMap.get(partition).addAll(fileIds); + } + }); + return leftMap; + }).orElse(new HashMap<>()); + } + + public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String partitionPath, List<String> eligibleFileIds) { + return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath) Review comment: i think getAllFileGroups doesn't return replaced file groups. Looks like we may have to change name to getAllActiveFileGroups to avoid confusion. You can make getAllFileGroupsIncludingReplaced public and use it? We could also use getReplacedFileGroupsBeforeOrOn (or add new method mentioned above) that returns HoodieFileGroups ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java ########## @@ -370,6 +376,59 @@ private String getLatestVersionBeforeCommit(List<FileSlice> fileSliceList, Hoodi return earliestCommitToRetain; } + public Map<String, List<String>> getReplacedFileIdsToClean(Option<HoodieInstant> earliestInstantToRetain) { + HoodieCleaningPolicy policy = config.getCleanerPolicy(); + HoodieTimeline replaceTimeline = hoodieTable.getActiveTimeline().getCompletedReplaceTimeline(); + + // Determine which replace commits can be cleaned. + Stream<HoodieInstant> cleanableReplaceCommits; + if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { + if (!earliestInstantToRetain.isPresent()) { + LOG.info("Not enough instants to start cleaning replace commits"); + return Collections.emptyMap(); + } + // all replace commits, before the earliest instant we want to retain, should be eligible for deleting the + // replaced file groups. + cleanableReplaceCommits = replaceTimeline + .filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.LESSER_THAN, + earliestInstantToRetain.get().getTimestamp())) + .getInstants(); + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { + // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely + // In other words, the file versions only apply to the active file groups. + cleanableReplaceCommits = replaceTimeline.getInstants(); + } else { + throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); + } + + // merge everything and make a map full of file ids to be cleaned. + return cleanableReplaceCommits.map(instant -> { + try { + return TimelineMetadataUtils.deserializeHoodieReplaceMetadata(hoodieTable.getActiveTimeline().getInstantDetails(instant).get()).getPartitionToReplaceFileIds(); + } catch (IOException e) { + throw new HoodieIOException("Unable to deserialize " + instant, e); + } + }).reduce((leftMap, rightMap) -> { + rightMap.forEach((partition, fileIds) -> { + if (!leftMap.containsKey(partition)) { + leftMap.put(partition, fileIds); + } else { + // duplicates should nt be possible; since replace of a file group should happen once only + leftMap.get(partition).addAll(fileIds); + } + }); + return leftMap; + }).orElse(new HashMap<>()); + } + + public List<CleanFileInfo> getDeletePathsForReplacedFileGroups(String partitionPath, List<String> eligibleFileIds) { + return hoodieTable.getFileSystemView().getAllFileGroups(partitionPath) + .filter(fg -> eligibleFileIds.contains(fg.getFileGroupId().getFileId())) + .flatMap(HoodieFileGroup::getAllFileSlices) + .flatMap(fileSlice -> getCleanFileInfoForSlice(fileSlice).stream()) + .collect(Collectors.toList()); Review comment: in line 220 we are honoring savepoint files for cleaning regular commits. But we might be missing that for replacecommits. is that fine for now? ########## File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java ########## @@ -92,13 +95,36 @@ case HoodieTimeline.SAVEPOINT_ACTION: // Nothing to be done here break; + case HoodieTimeline.REPLACE_COMMIT_ACTION: + HoodieReplaceCommitMetadata replaceMetadata = TimelineMetadataUtils.deserializeHoodieReplaceMetadata( + timeline.getInstantDetails(instant).get()); + records = Option.of(convertMetadataToRecords(replaceMetadata, instant.getTimestamp())); + break; default: throw new HoodieException("Unknown type of action " + instant.getAction()); } return records; } + public static List<HoodieRecord> convertMetadataToRecords(HoodieReplaceCommitMetadata replaceCommitMetadata, String instantTime) { Review comment: if HoodieReplaceCommitMetadata is json format, we could reuse convertMetadataToRecords(HoodieCommitMetadata, String) ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/BaseCleanActionExecutor.java ########## @@ -81,9 +83,21 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { context.setJobStatus(this.getClass().getSimpleName(), "Generates list of file slices to be cleaned"); - Map<String, List<HoodieCleanFileInfo>> cleanOps = context - .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) - .stream() + // Compute the file paths, to be cleaned in each valid file group + Stream<Pair<String, List<CleanFileInfo>>> cleanInfos = context.map(partitionsToClean, + partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), Review comment: is it possible to have one call to planner.getDeletePaths return all files to be cleaned? that seems like better abstraction at a high level to me. Not sure if there are disadvantages of separating them. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java ########## @@ -122,6 +122,7 @@ static HoodieRollbackStat mergeRollbackStat(HoodieRollbackStat stat1, HoodieRoll List<ListingBasedRollbackRequest> partitionRollbackRequests = new ArrayList<>(); switch (instantToRollback.getAction()) { case HoodieTimeline.COMMIT_ACTION: + case HoodieTimeline.REPLACE_COMMIT_ACTION: Review comment: Yes, this was missed because MOR is not well tested with clustering/insert_overwrite. New test covers this. Btw, There may be other places where we might be missing similar handling of REPLACE_COMMIT. I tried doing an audit, but its very tedious. So, I may have missed some places. Let me know if you find any. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org