This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 4e42ed5eae [HUDI-4145] Archives the metadata file in HoodieInstant.State sequence (part2) (#5676) 4e42ed5eae is described below commit 4e42ed5eae36f706ec35e5d09d8dc206b7fea130 Author: Danny Chan <yuzhao....@gmail.com> AuthorDate: Thu May 26 11:21:39 2022 +0800 [HUDI-4145] Archives the metadata file in HoodieInstant.State sequence (part2) (#5676) --- .../apache/hudi/client/HoodieTimelineArchiver.java | 46 ++++++++++++++-------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index f111bb70ef..c53554d8e0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -506,13 +506,7 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { List<HoodieInstant> instantsToStream = groupByTsAction.get(Pair.of(hoodieInstant.getTimestamp(), HoodieInstant.getComparableAction(hoodieInstant.getAction()))); if (instantsToStream != null) { - // sorts the instants in natural order to make sure the metadata files be removed - // in HoodieInstant.State sequence: requested -> inflight -> completed, - // this is important because when a COMPLETED metadata file is removed first, - // other monitors on the timeline(such as the compaction or clustering services) would - // mistakenly recognize the pending file as a pending operation, - // then all kinds of weird bugs occur. - return instantsToStream.stream().sorted(); + return instantsToStream.stream(); } else { // if a concurrent writer archived the instant return Stream.empty(); @@ -522,19 +516,29 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); - boolean success = true; - List<String> instantFiles = archivedInstants.stream().map(archivedInstant -> - new Path(metaClient.getMetaPath(), archivedInstant.getFileName()) - ).map(Path::toString).collect(Collectors.toList()); - context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName()); - Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false); + List<String> pendingInstantFiles = new ArrayList<>(); + List<String> completedInstantFiles = new ArrayList<>(); - for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) { - LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); - success &= result.getValue(); + for (HoodieInstant instant : archivedInstants) { + String filePath = new Path(metaClient.getMetaPath(), instant.getFileName()).toString(); + if (instant.isCompleted()) { + completedInstantFiles.add(filePath); + } else { + pendingInstantFiles.add(filePath); + } } + context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName()); + // Delete the metadata files + // in HoodieInstant.State sequence: requested -> inflight -> completed, + // this is important because when a COMPLETED metadata file is removed first, + // other monitors on the timeline(such as the compaction or clustering services) would + // mistakenly recognize the pending file as a pending operation, + // then all kinds of weird bugs occur. + boolean success = deleteArchivedInstantFiles(context, true, pendingInstantFiles); + success &= deleteArchivedInstantFiles(context, success, completedInstantFiles); + // Remove older meta-data from auxiliary path too Option<HoodieInstant> latestCommitted = Option.fromJavaOptional(archivedInstants.stream().filter(i -> i.isCompleted() && (i.getAction().equals(HoodieTimeline.COMMIT_ACTION) || (i.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)))).max(Comparator.comparing(HoodieInstant::getTimestamp))); @@ -545,6 +549,16 @@ public class HoodieTimelineArchiver<T extends HoodieAvroPayload, I, K, O> { return success; } + private boolean deleteArchivedInstantFiles(HoodieEngineContext context, boolean success, List<String> files) { + Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, files, context, false); + + for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) { + LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); + success &= result.getValue(); + } + return success; + } + /** * Remove older instants from auxiliary meta folder. *