This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new eb39b2a6968 [MINOR] Avoid scanning raw timeline on archival if no
instants to archive (#12482)
eb39b2a6968 is described below
commit eb39b2a696864e1e152c1d6ef1878b17ea9133fc
Author: Tim Brown <[email protected]>
AuthorDate: Fri Dec 13 09:12:42 2024 -0600
[MINOR] Avoid scanning raw timeline on archival if no instants to archive
(#12482)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 4 --
.../apache/hudi/client/BaseHoodieWriteClient.java | 3 ++
.../timeline/versioning/v1/TimelineArchiverV1.java | 54 +++++++++++++---------
.../timeline/versioning/v2/TimelineArchiverV2.java | 4 ++
4 files changed, 38 insertions(+), 27 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index e96a6707247..81256ddcbb9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -826,10 +826,6 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
try {
final Timer.Context timerContext = metrics.getArchiveCtx();
// We cannot have unbounded commit files. Archive commits if we have to
archive.
-
- // Reload table timeline to reflect the latest commits,
- // there are some table services (for e.g, the cleaning) that executed
right before the archiving.
- table.getMetaClient().reloadActiveTimeline();
HoodieTimelineArchiver archiver =
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(),
config, table);
int instantsToArchive = archiver.archiveIfRequired(context, true);
if (timerContext != null) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index aeefb09a114..e451ff50893 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -613,6 +613,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
LOG.info("Async archiver has finished");
} else {
LOG.info("Start to archive synchronously.");
+ // Reload table timeline to reflect the latest commits,
+ // there are some table services (for e.g, the cleaning) that executed
right before the archiving.
+ table.getMetaClient().reloadActiveTimeline();
archive(table);
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index c539811d149..8108d3b3ed9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -67,6 +67,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -139,14 +140,13 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
// there is no owner or instant time per se for archival.
txnManager.beginTransaction(Option.empty(), Option.empty());
}
- List<HoodieInstant> instantsToArchive =
getInstantsToArchive().collect(Collectors.toList());
- boolean success = true;
+ List<HoodieInstant> instantsToArchive = getInstantsToArchive();
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter(archiveFilePath.getParent());
- LOG.info("Archiving instants " + instantsToArchive);
+ LOG.info("Archiving instants {}", instantsToArchive);
archive(context, instantsToArchive);
- LOG.info("Deleting archived instants " + instantsToArchive);
- success = deleteArchivedInstants(instantsToArchive, context);
+ LOG.info("Deleting archived instants {}", instantsToArchive);
+ deleteArchivedInstants(instantsToArchive, context);
} else {
LOG.info("No Instants to archive");
}
@@ -280,17 +280,18 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
}
}
- private Stream<HoodieInstant> getInstantsToArchive() throws IOException {
- Stream<HoodieInstant> instants =
Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive());
+ private List<HoodieInstant> getInstantsToArchive() throws IOException {
if (config.isMetaserverEnabled()) {
- return Stream.empty();
+ return Collections.emptyList();
}
- // For archiving and cleaning instants, we need to include intermediate
state files if they exist
- HoodieActiveTimeline rawActiveTimeline = new ActiveTimelineV1(metaClient,
false);
- Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction =
rawActiveTimeline.getInstantsAsStream()
- .collect(Collectors.groupingBy(i -> Pair.of(i.requestedTime(),
- InstantComparatorV1.getComparableAction(i.getAction()))));
+ List<HoodieInstant> candidates =
Stream.concat(getCleanInstantsToArchive(),
getCommitInstantsToArchive()).collect(Collectors.toList());
+ if (candidates.isEmpty()) {
+ // exit early to avoid loading meta client for metadata table
+ return Collections.emptyList();
+ }
+
+ Stream<HoodieInstant> instants = candidates.stream();
// If metadata table is enabled, do not archive instants which are more
recent than the last compaction on the
// metadata table.
@@ -339,16 +340,23 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
}
}
- return instants.flatMap(hoodieInstant -> {
- List<HoodieInstant> instantsToStream =
groupByTsAction.get(Pair.of(hoodieInstant.requestedTime(),
- InstantComparatorV1.getComparableAction(hoodieInstant.getAction())));
- if (instantsToStream != null) {
- return instantsToStream.stream();
- } else {
- // if a concurrent writer archived the instant
- return Stream.empty();
- }
- });
+ List<HoodieInstant> instantsToArchive =
instants.collect(Collectors.toList());
+ if (instantsToArchive.isEmpty()) {
+ // Exit early to avoid loading raw timeline
+ return Collections.emptyList();
+ }
+
+ // For archiving and cleaning instants, we need to include intermediate
state files if they exist
+ HoodieActiveTimeline rawActiveTimeline = new ActiveTimelineV1(metaClient,
false);
+ Map<Pair<String, String>, List<HoodieInstant>> groupByTsAction =
rawActiveTimeline.getInstantsAsStream()
+ .collect(Collectors.groupingBy(i -> Pair.of(i.requestedTime(),
+ InstantComparatorV1.getComparableAction(i.getAction()))));
+
+ return instantsToArchive.stream()
+ .flatMap(hoodieInstant ->
+ groupByTsAction.getOrDefault(Pair.of(hoodieInstant.requestedTime(),
+
InstantComparatorV1.getComparableAction(hoodieInstant.getAction())),
Collections.emptyList()).stream())
+ .collect(Collectors.toList());
}
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants,
HoodieEngineContext context) throws IOException {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
index fd376868111..40f3abbc2bc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
@@ -298,6 +298,10 @@ public class TimelineArchiverV2<T extends
HoodieAvroPayload, I, K, O> implements
instantsToArchive.sort(InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR);
}
+ if (instantsToArchive.isEmpty()) {
+ return Stream.empty();
+ }
+
// For archive, we need to include instant's all states.
// The re-instantiation of the timeline may result in inconsistencies with
the existing meta client active timeline,
// When there is no lock guard of the archiving process, the 'raw'
timeline could contain less distinct instants