yihua commented on code in PR #18421:
URL: https://github.com/apache/hudi/pull/18421#discussion_r3035651728
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -429,6 +429,128 @@ protected void preCommit(HoodieCommitMetadata metadata) {
// Important to create this after the lock to ensure the latest commits
show up in the timeline without need for reload
HoodieTable table = createTable(config);
resolveWriteConflict(table, metadata,
this.pendingInflightAndRequestedInstants);
+
+ // Merge rolling metadata after conflict resolution, still within the lock
+ mergeRollingMetadata(table, metadata);
+ }
+
+ /**
+ * Merges rolling metadata from recent completed commits into the current
commit metadata.
+ * This method MUST be called within the transaction lock after conflict
resolution.
+ *
+ * <p>Rolling metadata keys configured via {@link
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+ * automatically carried forward from recent commits. The system walks back
up to
+ * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS}
commits to find the most
+ * recent value for each key. This ensures that important metadata like
checkpoint information
+ * remains accessible without worrying about archival or missing keys in
individual commits.
+ *
+ * @param table HoodieTable instance (may have refreshed timeline after
conflict resolution)
+ * @param metadata Current commit metadata to be augmented with rolling
metadata
+ */
+ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata
metadata) {
+ // Skip for metadata table - rolling metadata is only for data tables
+ if (table.isMetadataTable()) {
+ return;
+ }
+
+ Set<String> rollingKeys = config.getRollingMetadataKeys();
+ if (rollingKeys.isEmpty()) {
+ return; // No rolling metadata configured
+ }
+
+ // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
+ // 1. Fresh from createTable() if no conflict resolution happened
+ // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+ // In both cases, we have the latest view of the timeline.
+
+ HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+ if (commitsTimeline.empty()) {
+ log.info("No previous commits found. Rolling metadata will start with
current commit.");
+ return; // First commit - nothing to roll forward
+ }
+
+ try {
+ Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+ Map<String, String> foundRollingMetadata = new HashMap<>();
+ Set<String> remainingKeys = new java.util.HashSet<>(rollingKeys);
+
+ // Remove keys that are already present in current commit (current
values take precedence)
+ for (String key : rollingKeys) {
+ if (existingExtraMetadata.containsKey(key)) {
+ remainingKeys.remove(key);
+ }
+ }
+
+ if (remainingKeys.isEmpty()) {
+ log.debug("All rolling metadata keys are present in current commit. No
walkback needed.");
+ return;
+ }
+
+ int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+ int commitsWalkedBack = 0;
+
+ // Walk back through the timeline in reverse order (most recent first)
to find values for all remaining keys
+ List<HoodieInstant> recentCommits = commitsTimeline.getInstantsAsStream()
+ .collect(java.util.stream.Collectors.toList());
+
Review Comment:
🤖 nit: collecting all completed instants, reversing, then limiting is a bit
roundabout. Consider using
`getInstantsAsStream().sorted(HoodieInstant.COMPARATOR.reversed()).limit(lookbackLimit)`
or similar to avoid materializing the full list just to reverse it.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -429,6 +429,128 @@ protected void preCommit(HoodieCommitMetadata metadata) {
// Important to create this after the lock to ensure the latest commits
show up in the timeline without need for reload
HoodieTable table = createTable(config);
resolveWriteConflict(table, metadata,
this.pendingInflightAndRequestedInstants);
+
+ // Merge rolling metadata after conflict resolution, still within the lock
+ mergeRollingMetadata(table, metadata);
+ }
+
+ /**
+ * Merges rolling metadata from recent completed commits into the current
commit metadata.
+ * This method MUST be called within the transaction lock after conflict
resolution.
+ *
+ * <p>Rolling metadata keys configured via {@link
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+ * automatically carried forward from recent commits. The system walks back
up to
+ * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS}
commits to find the most
+ * recent value for each key. This ensures that important metadata like
checkpoint information
+ * remains accessible without worrying about archival or missing keys in
individual commits.
+ *
+ * @param table HoodieTable instance (may have refreshed timeline after
conflict resolution)
+ * @param metadata Current commit metadata to be augmented with rolling
metadata
+ */
+ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata
metadata) {
+ // Skip for metadata table - rolling metadata is only for data tables
+ if (table.isMetadataTable()) {
+ return;
+ }
+
+ Set<String> rollingKeys = config.getRollingMetadataKeys();
+ if (rollingKeys.isEmpty()) {
+ return; // No rolling metadata configured
+ }
+
+ // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
+ // 1. Fresh from createTable() if no conflict resolution happened
+ // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+ // In both cases, we have the latest view of the timeline.
+
+ HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+ if (commitsTimeline.empty()) {
+ log.info("No previous commits found. Rolling metadata will start with
current commit.");
+ return; // First commit - nothing to roll forward
+ }
+
+ try {
+ Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+ Map<String, String> foundRollingMetadata = new HashMap<>();
+ Set<String> remainingKeys = new java.util.HashSet<>(rollingKeys);
+
+ // Remove keys that are already present in current commit (current
values take precedence)
+ for (String key : rollingKeys) {
+ if (existingExtraMetadata.containsKey(key)) {
+ remainingKeys.remove(key);
+ }
+ }
+
+ if (remainingKeys.isEmpty()) {
+ log.debug("All rolling metadata keys are present in current commit. No
walkback needed.");
+ return;
+ }
+
+ int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+ int commitsWalkedBack = 0;
+
+ // Walk back through the timeline in reverse order (most recent first)
to find values for all remaining keys
+ List<HoodieInstant> recentCommits = commitsTimeline.getInstantsAsStream()
+ .collect(java.util.stream.Collectors.toList());
+
+ // Reverse to walk back from most recent to oldest
+ java.util.Collections.reverse(recentCommits);
+
+ // Limit to lookback commits
+ recentCommits = recentCommits.stream()
+ .limit(lookbackLimit)
+ .collect(java.util.stream.Collectors.toList());
+
+ log.debug("Walking back up to {} commits to find rolling metadata for
keys: {}",
+ lookbackLimit, remainingKeys);
+
+ for (HoodieInstant instant : recentCommits) {
+ if (remainingKeys.isEmpty()) {
+ break; // Found all keys
+ }
+
+ commitsWalkedBack++;
+ HoodieCommitMetadata commitMetadata =
table.getMetaClient().getActiveTimeline().readInstantContent(instant,
HoodieCommitMetadata.class);
+
+ // Check for remaining keys in this commit
+ for (String key : new java.util.HashSet<>(remainingKeys)) {
+ String value = commitMetadata.getMetadata(key);
+ if (value != null) {
+ foundRollingMetadata.put(key, value);
+ remainingKeys.remove(key);
+ log.debug("Found rolling metadata key '{}' in commit {} with
value: {}",
+ key, instant.requestedTime(), value);
+ }
+ }
+ }
+
+ // Add found rolling metadata to current commit
+ int rolledForwardCount = 0;
+ for (Map.Entry<String, String> entry : foundRollingMetadata.entrySet()) {
+ metadata.addMetadata(entry.getKey(), entry.getValue());
+ rolledForwardCount++;
+ }
+
+ int updatedCount = rollingKeys.size() - remainingKeys.size() -
rolledForwardCount;
+
+ if (rolledForwardCount > 0 || updatedCount > 0 ||
!remainingKeys.isEmpty()) {
+ log.info("Rolling metadata merge completed. Walked back {} commits. "
+ + "Rolled forward: {}, Updated in current: {}, Not found: {},
Total rolling keys: {}",
+ commitsWalkedBack, rolledForwardCount, updatedCount,
remainingKeys.size(), rollingKeys.size());
+ }
+
+ if (!remainingKeys.isEmpty()) {
+ log.warn("Rolling metadata keys not found in last {} commits: {}. "
+ + "These keys will not be included in the current commit.",
lookbackLimit, remainingKeys);
+ }
+
+ } catch (IOException e) {
Review Comment:
🤖 Silently swallowing `IOException` here could be dangerous for the primary
use case — checkpoint tracking for exactly-once semantics. If metadata reads
fail (e.g., transient storage issue), the commit proceeds without rolling
metadata, and downstream consumers may re-process data on recovery without any
signal that checkpoint info was lost. Could this at least propagate the
exception (or be configurable), rather than silently dropping the metadata?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -429,6 +429,128 @@ protected void preCommit(HoodieCommitMetadata metadata) {
// Important to create this after the lock to ensure the latest commits
show up in the timeline without need for reload
HoodieTable table = createTable(config);
resolveWriteConflict(table, metadata,
this.pendingInflightAndRequestedInstants);
+
Review Comment:
🤖 This is added only to `BaseHoodieWriteClient.preCommit()`, but
`BaseHoodieTableServiceClient` has its own `preCommit()` at line 182 that
doesn't call `mergeRollingMetadata`. For async table services
(compaction/clustering) running in a separate service client, rolling metadata
won't be carried forward. Is that intentional? The PR description mentions
table services should preserve checkpoint metadata automatically.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -429,6 +429,128 @@ protected void preCommit(HoodieCommitMetadata metadata) {
// Important to create this after the lock to ensure the latest commits
show up in the timeline without need for reload
HoodieTable table = createTable(config);
resolveWriteConflict(table, metadata,
this.pendingInflightAndRequestedInstants);
+
+ // Merge rolling metadata after conflict resolution, still within the lock
+ mergeRollingMetadata(table, metadata);
+ }
+
+ /**
+ * Merges rolling metadata from recent completed commits into the current
commit metadata.
+ * This method MUST be called within the transaction lock after conflict
resolution.
+ *
+ * <p>Rolling metadata keys configured via {@link
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
+ * automatically carried forward from recent commits. The system walks back
up to
+ * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS}
commits to find the most
+ * recent value for each key. This ensures that important metadata like
checkpoint information
+ * remains accessible without worrying about archival or missing keys in
individual commits.
+ *
+ * @param table HoodieTable instance (may have refreshed timeline after
conflict resolution)
+ * @param metadata Current commit metadata to be augmented with rolling
metadata
+ */
+ protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata
metadata) {
+ // Skip for metadata table - rolling metadata is only for data tables
+ if (table.isMetadataTable()) {
+ return;
+ }
+
+ Set<String> rollingKeys = config.getRollingMetadataKeys();
+ if (rollingKeys.isEmpty()) {
+ return; // No rolling metadata configured
+ }
+
+ // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
+ // 1. Fresh from createTable() if no conflict resolution happened
+ // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+ // In both cases, we have the latest view of the timeline.
+
+ HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+
+ if (commitsTimeline.empty()) {
+ log.info("No previous commits found. Rolling metadata will start with
current commit.");
+ return; // First commit - nothing to roll forward
+ }
+
+ try {
+ Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
+ Map<String, String> foundRollingMetadata = new HashMap<>();
+ Set<String> remainingKeys = new java.util.HashSet<>(rollingKeys);
+
+ // Remove keys that are already present in current commit (current
values take precedence)
+ for (String key : rollingKeys) {
+ if (existingExtraMetadata.containsKey(key)) {
+ remainingKeys.remove(key);
+ }
+ }
+
+ if (remainingKeys.isEmpty()) {
+ log.debug("All rolling metadata keys are present in current commit. No
walkback needed.");
+ return;
+ }
+
+ int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+ int commitsWalkedBack = 0;
+
+ // Walk back through the timeline in reverse order (most recent first)
to find values for all remaining keys
+ List<HoodieInstant> recentCommits = commitsTimeline.getInstantsAsStream()
+ .collect(java.util.stream.Collectors.toList());
+
+ // Reverse to walk back from most recent to oldest
+ java.util.Collections.reverse(recentCommits);
Review Comment:
🤖 Good question. Looking at the code, `getInstantsAsStream()` returns
instants in **requested-time** order, not completion-time order. After
`filterCompletedInstants()` and the `Collections.reverse()`, this walks back by
most-recent requested time. In multi-writer or concurrent scenarios, completion
order can diverge from request order, so the walkback might not visit the truly
latest completed commit first. `getReverseOrderedInstantsByCompletionTime()`
exists on the timeline and would be a more precise fit here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]