This is an automated email from the ASF dual-hosted git repository.
nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 127c6ee03186 feat(common): roll over commit metadata to clean (#18590)
127c6ee03186 is described below
commit 127c6ee031867a7a6b49c6f3ccadbba8161ee8ef
Author: Krishen <[email protected]>
AuthorDate: Mon May 4 13:33:22 2026 -0700
feat(common): roll over commit metadata to clean (#18590)
When rolling metadata is configured (hoodie.write.rolling.metadata.keys),
important metadata like schema and checkpoint keys are carried forward across
commits. However, clean instants do not participate in this rolling mechanism,
they neither receive rolled-over metadata nor serve as a source for subsequent
lookups. After archival removes old ingestion commits, if only clean instants
remain on the active timeline between surviving commits, the chain of
rolled-over metadata can break.
This PR ensures that clean commits also carry rolled-over metadata in their
extraMetadata field, preserving the rolling metadata chain across archival.
---------
Co-authored-by: Krishen Bhan <“[email protected]”>
---
.../org/apache/hudi/client/BaseHoodieClient.java | 170 +++++++++++++--------
.../org/apache/hudi/config/HoodieWriteConfig.java | 19 +--
.../table/action/clean/CleanActionExecutor.java | 6 +-
.../hudi/common/table/timeline/HoodieTimeline.java | 5 +
.../TestHoodieClientOnCopyOnWriteStorage.java | 86 +++++++++++
5 files changed, 210 insertions(+), 76 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 842dc38177a5..002ac1e7454f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.callback.HoodieClientInitCallback;
import org.apache.hudi.client.embedded.EmbeddedTimelineServerHelper;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
@@ -58,6 +59,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -313,19 +315,24 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
}
/**
- * Merges rolling metadata from recent completed commits into the current
commit metadata.
+ * Merges rolling metadata from recent completed instants into the current
commit metadata.
* This method MUST be called within the transaction lock after conflict
resolution.
*
* <p>Rolling metadata keys configured via {@link
HoodieWriteConfig#ROLLING_METADATA_KEYS} will be
- * automatically carried forward from recent commits. The system walks back
up to
- * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS}
commits to find the most
- * recent value for each key. This ensures that important metadata like
checkpoint information
- * remains accessible without worrying about archival or missing keys in
individual commits.
+ * automatically carried forward from recent instants. The system walks back
through completed
+ * commits and clean instants (in reverse completion-time order) up to
+ * {@link HoodieWriteConfig#ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS} to
find the most
+ * recent value for each key.
*
* @param table HoodieTable instance (may have refreshed timeline after
conflict resolution)
* @param metadata Current commit metadata to be augmented with rolling
metadata
*/
protected void mergeRollingMetadata(HoodieTable table, HoodieCommitMetadata
metadata) {
+ // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
+ // 1. Fresh from createTable() if no conflict resolution happened
+ // 2. Reloaded during resolveWriteConflict() if conflicts were checked
+ // In both cases, we have the latest view of the timeline.
+
// Skip for metadata table - rolling metadata is only for data tables
if (table.isMetadataTable()) {
return;
@@ -336,88 +343,119 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
return; // No rolling metadata configured
}
- // IMPORTANT: We're inside the lock here. The timeline in 'table' is
either:
- // 1. Fresh from createTable() if no conflict resolution happened
- // 2. Reloaded during resolveWriteConflict() if conflicts were checked
- // In both cases, we have the latest view of the timeline.
+ Map<String, String> foundRollingMetadata =
collectRollingMetadataFromTimeline(table, config, rollingKeys,
metadata.getExtraMetadata());
+ for (Map.Entry<String, String> entry : foundRollingMetadata.entrySet()) {
+ metadata.addMetadata(entry.getKey(), entry.getValue());
+ }
+ }
- HoodieTimeline commitsTimeline =
table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ /**
+ * Overload of {@link #mergeRollingMetadata(HoodieTable,
HoodieCommitMetadata)} for clean
+ * commits. Populates {@link HoodieCleanMetadata#getExtraMetadata()} with
rolling metadata
+ * values found on the active timeline.
+ *
+ * <p>This is {@code public static} so that {@code CleanActionExecutor}
(which does not extend
+ * {@code BaseHoodieClient}) can invoke it.
+ */
+ public static void mergeRollingMetadata(HoodieTable table, HoodieWriteConfig
config, HoodieCleanMetadata metadata) {
+ if (table.isMetadataTable()) {
+ return;
+ }
+ Set<String> rollingKeys = config.getRollingMetadataKeys();
+ if (rollingKeys.isEmpty()) {
+ return;
+ }
- if (commitsTimeline.empty()) {
- log.info("No previous commits found. Rolling metadata will start with
current commit.");
- return; // First commit - nothing to roll forward
+ Map<String, String> existing = metadata.getExtraMetadata() != null
+ ? metadata.getExtraMetadata() : Collections.emptyMap();
+ Map<String, String> foundRollingMetadata =
collectRollingMetadataFromTimeline(table, config, rollingKeys, existing);
+ if (!foundRollingMetadata.isEmpty()) {
+ Map<String, String> merged = new HashMap<>(existing);
+ merged.putAll(foundRollingMetadata);
+ metadata.setExtraMetadata(merged);
}
+ }
- try {
- Map<String, String> existingExtraMetadata = metadata.getExtraMetadata();
- Map<String, String> foundRollingMetadata = new HashMap<>();
- Set<String> remainingKeys = new HashSet<>(rollingKeys);
-
- // Remove keys that are already present with non-empty values in current
commit (current values take precedence)
- for (String key : rollingKeys) {
- if (existingExtraMetadata.containsKey(key) &&
!StringUtils.isNullOrEmpty(existingExtraMetadata.get(key))) {
- remainingKeys.remove(key);
- }
- }
+ /**
+ * Walks backwards through completed instants (commits, replace-commits,
delta-commits, and
+ * clean) on the active timeline, extracting extra-metadata values for the
requested rolling
+ * keys. For commit-type instants the values come from {@link
HoodieCommitMetadata#getMetadata};
+ * for clean instants they come from {@link
HoodieCleanMetadata#getExtraMetadata()}.
+ *
+ * <p>Keys already present with a non-empty value in {@code existingExtra}
are skipped (empty
+ * strings are treated as "missing").
+ */
+ private static Map<String, String> collectRollingMetadataFromTimeline(
+ HoodieTable table, HoodieWriteConfig config,
+ Set<String> rollingKeys, Map<String, String> existingExtra) {
- if (remainingKeys.isEmpty()) {
- log.debug("All rolling metadata keys are present in current commit. No
walkback needed.");
- return;
- }
+ Map<String, String> foundRollingMetadata = new HashMap<>();
+ Set<String> remaining = new HashSet<>(rollingKeys);
- int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
- int commitsWalkedBack = 0;
+ for (String key : rollingKeys) {
+ if (existingExtra.containsKey(key) &&
!StringUtils.isNullOrEmpty(existingExtra.get(key))) {
+ remaining.remove(key);
+ }
+ }
+ if (remaining.isEmpty()) {
+ log.debug("All rolling metadata keys already present. No walkback
needed.");
+ return foundRollingMetadata;
+ }
- // Walk back through the timeline in reverse order (most recent first)
to find values for all remaining keys
- List<HoodieInstant> recentCommits =
commitsTimeline.getReverseOrderedInstantsByCompletionTime()
- .limit(lookbackLimit)
- .collect(Collectors.toList());
+ int lookbackLimit = config.getRollingMetadataTimelineLookbackCommits();
+ HoodieTimeline completed =
table.getActiveTimeline().filterCompletedInstants();
+ List<HoodieInstant> instants =
completed.getReverseOrderedInstantsByCompletionTime()
+ .filter(i ->
HoodieTimeline.VALID_ACTIONS_FOR_ROLLING_METADATA.contains(i.getAction()))
+ .limit(lookbackLimit)
+ .collect(Collectors.toList());
- log.debug("Walking back up to {} commits to find rolling metadata for
keys: {}",
- lookbackLimit, remainingKeys);
+ log.debug("Walking back up to {} instants to find rolling metadata for
keys: {}", lookbackLimit, remaining);
+ int instantsWalkedBack = 0;
- for (HoodieInstant instant : recentCommits) {
- if (remainingKeys.isEmpty()) {
- break; // Found all keys
+ try {
+ for (HoodieInstant instant : instants) {
+ if (remaining.isEmpty()) {
+ break;
}
+ String action = instant.getAction();
+ Map<String, String> extraMeta = null;
- commitsWalkedBack++;
- HoodieCommitMetadata commitMetadata =
table.getMetaClient().getActiveTimeline().readInstantContent(instant,
HoodieCommitMetadata.class);
+ if (HoodieTimeline.CLEAN_ACTION.equals(action)) {
+ HoodieCleanMetadata cleanMeta =
table.getActiveTimeline().readCleanMetadata(instant);
+ extraMeta = cleanMeta.getExtraMetadata();
+ } else {
+ HoodieCommitMetadata commitMeta =
table.getMetaClient().getActiveTimeline()
+ .readInstantContent(instant, HoodieCommitMetadata.class);
+ extraMeta = commitMeta.getExtraMetadata();
+ }
+ instantsWalkedBack++;
- // Check for remaining keys in this commit
- for (String key : new HashSet<>(remainingKeys)) {
- String value = commitMetadata.getMetadata(key);
+ if (extraMeta == null) {
+ continue;
+ }
+ for (String key : new HashSet<>(remaining)) {
+ String value = extraMeta.get(key);
if (!StringUtils.isNullOrEmpty(value)) {
foundRollingMetadata.put(key, value);
- remainingKeys.remove(key);
- log.debug("Found rolling metadata key '{}' in commit {} with
value: {}",
- key, instant.requestedTime(), value);
+ remaining.remove(key);
+ log.debug("Found rolling metadata key '{}' in {} instant {} with
value: {}",
+ key, action, instant.requestedTime(), value);
}
}
}
- // Add found rolling metadata to current commit
- for (Map.Entry<String, String> entry : foundRollingMetadata.entrySet()) {
- metadata.addMetadata(entry.getKey(), entry.getValue());
- }
-
- int rolledForwardCount = foundRollingMetadata.size();
- int updatedCount = rollingKeys.size() - remainingKeys.size() -
rolledForwardCount;
-
- if (rolledForwardCount > 0 || updatedCount > 0 ||
!remainingKeys.isEmpty()) {
- log.info("Rolling metadata merge completed. Walked back {} commits. "
- + "Rolled forward: {}, Updated in current: {}, Not found: {},
Total rolling keys: {}",
- commitsWalkedBack, rolledForwardCount, updatedCount,
remainingKeys.size(), rollingKeys.size());
+ if (!foundRollingMetadata.isEmpty() || !remaining.isEmpty()) {
+ log.info("Rolling metadata: walked {} instants. Rolled forward: {},
Not found: {}, Total keys: {}",
+ instantsWalkedBack, foundRollingMetadata.size(), remaining.size(),
rollingKeys.size());
}
-
- if (!remainingKeys.isEmpty()) {
- log.warn("Rolling metadata keys not found in last {} commits: {}. "
- + "These keys will not be included in the current commit.",
lookbackLimit, remainingKeys);
+ if (!remaining.isEmpty()) {
+ log.warn("Rolling metadata keys not found in last {} instants: {}.",
instantsWalkedBack, remaining);
}
-
} catch (IOException e) {
- log.error("Failed to read previous commit metadata for rolling metadata
keys: {}.", rollingKeys, e);
- throw new HoodieIOException("Failed to read previous commit metadata for
rolling metadata keys: " + rollingKeys, e);
+ log.error("Failed to read previous metadata for rolling metadata keys:
{}.", rollingKeys, e);
+ throw new HoodieIOException("Failed to read previous metadata for
rolling keys: " + rollingKeys, e);
}
+
+ return foundRollingMetadata;
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 5df834121bf9..a663c63dba78 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -768,21 +768,22 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.sinceVersion("1.2.0")
.withDocumentation("Comma-separated list of extra metadata keys that
should be automatically carried forward "
- + "to every new commit. These keys will be read from recent commit
metadata and included in new commits, "
- + "ensuring they remain accessible without walking the timeline or
worrying about archival. "
- + "This is useful for tracking checkpoint information (e.g., Kafka
offsets, Flink checkpoints) or any metadata "
- + "that needs to persist across commits. New values override old
ones. Only applies to data table commits.");
+ + "to every new commit and clean instant. These keys will be read
from recent commit and clean metadata "
+ + "and included in new commits/cleans, ensuring they remain
accessible without walking the timeline or "
+ + "worrying about archival. This is useful for tracking checkpoint
information (e.g., Kafka offsets, "
+ + "Flink checkpoints) or any metadata that needs to persist across
commits. New values override old ones. "
+ + "Only applies to data table commits and clean instants.");
public static final ConfigProperty<Integer>
ROLLING_METADATA_TIMELINE_LOOKBACK_COMMITS = ConfigProperty
.key("hoodie.write.rolling.metadata.timeline.lookback.commits")
.defaultValue(10)
.markAdvanced()
.sinceVersion("1.2.0")
- .withDocumentation("Maximum number of completed commits to walk back in
the timeline when searching for "
- + "rolling metadata keys. If a rolling metadata key is not found in
the latest commit, the system will "
- + "walk back up to this many commits to find the most recent value.
This ensures rolling metadata is "
- + "preserved even if some commits don't update all keys. Higher
values provide more resilience but may "
- + "impact performance. Only applies when
hoodie.write.rolling.metadata.keys is configured.");
+ .withDocumentation("Maximum number of completed instants (commits and
clean) to walk back in the timeline "
+ + "when searching for rolling metadata keys. If a rolling metadata
key is not found in the latest instant, "
+ + "the system will walk back up to this many instants to find the
most recent value. This ensures rolling "
+ + "metadata is preserved even if some instants don't carry all keys.
Higher values provide more resilience "
+ + "but may impact performance. Only applies when
hoodie.write.rolling.metadata.keys is configured.");
public static final ConfigProperty<Boolean> ALLOW_OPERATION_METADATA_FIELD =
ConfigProperty
.key("hoodie.allow.operation.metadata.field")
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 3fad30b22bdf..63a6e7d8f994 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.clean;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.client.BaseHoodieClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.HoodieCleanStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -215,7 +216,6 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
}
List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);
- table.getMetaClient().reloadActiveTimeline();
HoodieCleanMetadata metadata;
if (cleanStats.isEmpty()) {
metadata = createEmptyCleanMetadata(cleanerPlan, inflightInstant,
timer.endTimer());
@@ -228,6 +228,10 @@ public class CleanActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I, K,
);
}
this.txnManager.beginStateChange(Option.of(inflightInstant),
Option.empty());
+ // Reload inside the lock so mergeRollingMetadata reads the latest
timeline,
+ // matching the same contract as mergeRollingMetadata for commit
metadata.
+ table.getMetaClient().reloadActiveTimeline();
+ BaseHoodieClient.mergeRollingMetadata(table, config, metadata);
writeTableMetadata(metadata, inflightInstant.requestedTime());
table.getActiveTimeline().transitionCleanInflightToComplete(
false,
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
index f9a59c6ab623..4ad8ec769ac3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java
@@ -30,6 +30,7 @@ import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
+import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.storage.HoodieInstantWriter;
@@ -77,6 +78,10 @@ public interface HoodieTimeline extends HoodieInstantReader,
Serializable {
CLEAN_ACTION, SAVEPOINT_ACTION, RESTORE_ACTION, ROLLBACK_ACTION,
COMPACTION_ACTION, LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION,
CLUSTERING_ACTION, INDEXING_ACTION};
+ Set<String> VALID_ACTIONS_FOR_ROLLING_METADATA = CollectionUtils.createSet(
+ COMMIT_ACTION, DELTA_COMMIT_ACTION, CLEAN_ACTION,
+ COMPACTION_ACTION, LOG_COMPACTION_ACTION, REPLACE_COMMIT_ACTION,
CLUSTERING_ACTION);
+
String COMMIT_EXTENSION = "." + COMMIT_ACTION;
String DELTA_COMMIT_EXTENSION = "." + DELTA_COMMIT_ACTION;
String CLEAN_EXTENSION = "." + CLEAN_ACTION;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 2a4fa731e04b..ffb829165c7b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client.functional;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
@@ -68,6 +69,7 @@ import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.FileFormatUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieArchivalConfig;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -2066,6 +2068,90 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
"TableSchemaResolver should find schema even with clustering-only
timeline");
}
+ @Test
+ public void testRollingMetadataPreservedInCleanCommits() throws Exception {
+ String schemaKey = HoodieCommitMetadata.SCHEMA_KEY;
+ dataGen = new HoodieTestDataGenerator(new String[]
{DEFAULT_FIRST_PARTITION_PATH});
+
+ HoodieWriteConfig config = getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .compactionSmallFileSize(0).build())
+ .withRollingMetadataKeys(schemaKey)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .retainCommits(1)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(10, 12).build())
+ .build();
+
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+
+ String firstCommit = client.startCommit();
+ List<HoodieRecord> records = dataGen.generateInserts(firstCommit, 100);
+ JavaRDD<WriteStatus> firstResult = client.insert(jsc.parallelize(records,
1), firstCommit);
+ client.commit(firstCommit, firstResult);
+
+ // Upsert several times to create superseded file versions
+ for (int i = 0; i < 4; i++) {
+ String commitTime = client.startCommit();
+ List<HoodieRecord> updates = dataGen.generateUpdates(commitTime,
records);
+ JavaRDD<WriteStatus> result = client.upsert(jsc.parallelize(updates, 1),
commitTime);
+ client.commit(commitTime, result);
+ }
+
+ // Run clean — retainCommits(1) means old file versions from earlier
commits are eligible
+ HoodieCleanMetadata cleanResult = client.clean();
+ assertTrue(cleanResult != null, "Clean should produce metadata (files to
clean exist)");
+
+ HoodieTableMetaClient freshMeta = HoodieTableMetaClient.reload(metaClient);
+ HoodieTimeline cleanTimeline = freshMeta.getActiveTimeline()
+ .getCleanerTimeline().filterCompletedInstants();
+ assertFalse(cleanTimeline.empty(), "Should have at least one clean
instant");
+
+ HoodieInstant lastClean = cleanTimeline.lastInstant().get();
+ HoodieCleanMetadata cleanMetadata =
cleanTimeline.readCleanMetadata(lastClean);
+
+ Map<String, String> cleanExtraMetadata = cleanMetadata.getExtraMetadata();
+ assertTrue(cleanExtraMetadata != null, "Clean metadata should have
extraMetadata map");
+ assertTrue(cleanExtraMetadata.containsKey(schemaKey),
+ "Clean's extraMetadata should contain rolled-over schema key");
+ assertFalse(cleanExtraMetadata.get(schemaKey).isEmpty(),
+ "Rolled-over schema in clean should be non-empty");
+
+ // Now make one more commit with lookback=1. The rolling metadata walk
should find
+ // the schema key in the clean instant (the most recent instant with the
key).
+ HoodieWriteConfig lookback1Config = getConfigBuilder(TRIP_EXAMPLE_SCHEMA)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .compactionSmallFileSize(0).build())
+ .withRollingMetadataKeys(schemaKey)
+ .withRollingMetadataTimelineLookbackCommits(1)
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+ .withAutoClean(false)
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .retainCommits(1)
+ .build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .archiveCommitsWith(10, 12).build())
+ .build();
+
+ SparkRDDWriteClient lookbackClient = getHoodieWriteClient(lookback1Config);
+ String nextCommit = lookbackClient.startCommit();
+ List<HoodieRecord> nextUpdates = dataGen.generateUpdates(nextCommit,
records);
+ JavaRDD<WriteStatus> nextResult =
lookbackClient.upsert(jsc.parallelize(nextUpdates, 1), nextCommit);
+ lookbackClient.commit(nextCommit, nextResult);
+
+ freshMeta = HoodieTableMetaClient.reload(metaClient);
+ HoodieTimeline commitsTimeline = freshMeta.getActiveTimeline()
+ .getCommitsTimeline().filterCompletedInstants();
+ HoodieInstant latestCommit = commitsTimeline.lastInstant().get();
+ HoodieCommitMetadata latestMeta =
commitsTimeline.readCommitMetadata(latestCommit);
+ String rolledSchema = latestMeta.getMetadata(schemaKey);
+ assertFalse(StringUtils.isNullOrEmpty(rolledSchema),
+ "Schema should be rolled over into new commit even with lookback=1");
+ }
+
/**
* Disabling row writer here as clustering tests will throw the error below
if it is used.
* java.util.concurrent.CompletionException: java.lang.ClassNotFoundException