This is an automated email from the ASF dual-hosted git repository. satish pushed a commit to branch release-0.12.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8d0fe3c50ec2ed072ebbbb397588bf29b777cb46 Author: Yann Byron <biyan900...@gmail.com> AuthorDate: Wed Nov 30 18:11:23 2022 +0800 [HUDI-5279] move logic for deleting active instant to HoodieActiveTimeline (#7196) --- .../apache/hudi/client/HoodieTimelineArchiver.java | 40 ++++++++++++---------- .../table/timeline/HoodieActiveTimeline.java | 14 +++++--- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index bb814f817d0..a61a5c90082 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -537,15 +537,14 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); - List<String> pendingInstantFiles = new ArrayList<>(); - List<String> completedInstantFiles = new ArrayList<>(); + List<HoodieInstant> pendingInstants = new ArrayList<>(); + List<HoodieInstant> completedInstants = new ArrayList<>(); for (HoodieInstant instant : archivedInstants) { - String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString(); if (instant.isCompleted()) { - completedInstantFiles.add(filePath); + completedInstants.add(instant); } else { - pendingInstantFiles.add(filePath); + pendingInstants.add(instant); } } @@ -556,27 +555,30 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { // other monitors on the timeline(such as the compaction or clustering services) would // mistakenly recognize the pending file as a pending operation, // then all kinds of weird bugs occur. - boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles); - success &= deleteArchivedInstantFiles(context, success, completedInstantFiles); + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + if (!pendingInstants.isEmpty()) { + context.foreach( + pendingInstants, + instant -> activeTimeline.deleteInstantFileIfExists(instant), + Math.min(pendingInstants.size(), config.getArchiveDeleteParallelism()) + ); + } + if (!completedInstants.isEmpty()) { + context.foreach( + completedInstants, + instant -> activeTimeline.deleteInstantFileIfExists(instant), + Math.min(completedInstants.size(), config.getArchiveDeleteParallelism()) + ); + } // Remove older meta-data from auxiliary path too Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); LOG.info("Latest Committed Instant=" + latestCommitted); if (latestCommitted.isPresent()) { - success &= deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); + return deleteAllInstantsOlderOrEqualsInAuxMetaFolder(latestCommitted.get()); } - return success; - } - - private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List<String> files) { - Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false); - - for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) { - LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); - success &= result.getValue(); - } - return success; + return true; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 96e22e9dac0..7dc3acc94e7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -265,22 +265,26 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { deleteInstantFile(instant); } + /** + * Note: This method should only be used in the case that delete requested/inflight instant or empty clean instant, + * and completed commit instant in an archive operation. + */ public void deleteInstantFileIfExists(HoodieInstant instant) { LOG.info("Deleting instant " + instant); - Path inFlightCommitFilePath = getInstantFileNamePath(instant.getFileName()); + Path commitFilePath = getInstantFileNamePath(instant.getFileName()); try { - if (metaClient.getFs().exists(inFlightCommitFilePath)) { - boolean result = metaClient.getFs().delete(inFlightCommitFilePath, false); + if (metaClient.getFs().exists(commitFilePath)) { + boolean result = metaClient.getFs().delete(commitFilePath, false); if (result) { LOG.info("Removed instant " + instant); } else { throw new HoodieIOException("Could not delete instant " + instant); } } else { - LOG.warn("The commit " + inFlightCommitFilePath + " to remove does not exist"); + LOG.warn("The commit " + commitFilePath + " to remove does not exist"); } } catch (IOException e) { - throw new HoodieIOException("Could not remove inflight commit " + inFlightCommitFilePath, e); + throw new HoodieIOException("Could not remove commit " + commitFilePath, e); } }