This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bb5abb6b0483 fix: Optimizing internal schema lookup in
TableSchemaResolver (#18387)
bb5abb6b0483 is described below
commit bb5abb6b0483c043f38f7f32b73485f025f95b1d
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Mar 27 07:55:22 2026 -0700
fix: Optimizing internal schema lookup in TableSchemaResolver (#18387)
---
.../hudi/common/table/TableSchemaResolver.java | 9 +-
.../hudi/common/table/TestTableSchemaResolver.java | 125 +++++++++++++++++++++
2 files changed, 131 insertions(+), 3 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 29a20e735c13..095a4ffef717 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -310,11 +310,14 @@ public class TableSchemaResolver {
*/
public Option<InternalSchema> getTableInternalSchemaFromCommitMetadata() {
HoodieTimeline completedInstants =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
- HoodieTimeline timeline = completedInstants
+ // Walk backwards through timeline to find the first (most recent) instant
that can update schema
+ // This avoids reading commit metadata for all instants
+ return
Option.fromJavaOptional(completedInstants.getReverseOrderedInstants()
.filter(instant -> { // consider only instants that can update/change
schema.
return
WriteOperationType.canUpdateSchema(getCachedCommitMetadata(instant).getOperationType());
- });
- return
timeline.lastInstant().flatMap(this::getTableInternalSchemaFromCommitMetadata);
+ })
+ .findFirst())
+ .flatMap(this::getTableInternalSchemaFromCommitMetadata);
}
/**
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index 4a26f8715e96..193bb0173dfd 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -206,4 +206,129 @@ class TestTableSchemaResolver {
writer.close();
return writer.getLogFile().getPath();
}
+
+ @Test
+ void
testGetTableInternalSchemaFromCommitMetadataFindsLatestSchemaUpdateInstant()
throws IOException {
+ // Given: A timeline with multiple instants where some can update schema
and some cannot
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+ HoodieInstant clusterInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "001",
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+ HoodieInstant insertInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"002", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+ HoodieInstant compactInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"003", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+
+ HoodieCommitMetadata clusterMetadata = new HoodieCommitMetadata();
+
clusterMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.CLUSTER);
+
+ HoodieCommitMetadata insertMetadata = new HoodieCommitMetadata();
+
insertMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.INSERT);
+ // Create a valid InternalSchema
+ org.apache.hudi.internal.schema.InternalSchema internalSchema = new
org.apache.hudi.internal.schema.InternalSchema(
+ org.apache.hudi.internal.schema.Types.RecordType.get(
+ org.apache.hudi.internal.schema.Types.Field.get(0, false, "id",
org.apache.hudi.internal.schema.Types.IntType.get())));
+
insertMetadata.addMetadata(org.apache.hudi.internal.schema.utils.SerDeHelper.LATEST_SCHEMA,
+
org.apache.hudi.internal.schema.utils.SerDeHelper.toJson(internalSchema));
+
+ HoodieCommitMetadata compactMetadata = new HoodieCommitMetadata();
+
compactMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.COMPACT);
+
+ HoodieTimeline timeline = mock(HoodieTimeline.class);
+
when(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+ // Timeline in reverse order: 003 (compact), 002 (insert), 001 (cluster)
+
when(timeline.getReverseOrderedInstants()).thenReturn(Stream.of(compactInstant,
insertInstant, clusterInstant));
+
when(timeline.readCommitMetadata(compactInstant)).thenReturn(compactMetadata);
+
when(timeline.readCommitMetadata(insertInstant)).thenReturn(insertMetadata);
+
when(timeline.readCommitMetadata(clusterInstant)).thenReturn(clusterMetadata);
+
+ // When: Get internal schema from commit metadata
+ Option<org.apache.hudi.internal.schema.InternalSchema> result =
schemaResolver.getTableInternalSchemaFromCommitMetadata();
+
+ // Then: Should find the insert instant (002) which is the most recent
schema-updating operation
+ assertTrue(result.isPresent());
+ }
+
+ @Test
+ void
testGetTableInternalSchemaFromCommitMetadataSkipsNonSchemaUpdatingOperations()
throws IOException {
+ // Given: A timeline with only non-schema-updating operations (CLUSTER,
COMPACT, INDEX, LOG_COMPACT)
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+ HoodieInstant clusterInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.REPLACE_COMMIT_ACTION, "001",
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+ HoodieInstant compactInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"002", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+
+ HoodieCommitMetadata clusterMetadata = new HoodieCommitMetadata();
+
clusterMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.CLUSTER);
+
+ HoodieCommitMetadata compactMetadata = new HoodieCommitMetadata();
+
compactMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.COMPACT);
+
+ HoodieTimeline timeline = mock(HoodieTimeline.class);
+
when(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+
when(timeline.getReverseOrderedInstants()).thenReturn(Stream.of(compactInstant,
clusterInstant));
+
when(timeline.readCommitMetadata(compactInstant)).thenReturn(compactMetadata);
+
when(timeline.readCommitMetadata(clusterInstant)).thenReturn(clusterMetadata);
+
+ // When: Get internal schema from commit metadata
+ Option<org.apache.hudi.internal.schema.InternalSchema> result =
schemaResolver.getTableInternalSchemaFromCommitMetadata();
+
+ // Then: Should return empty since no schema-updating operations exist
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testGetTableInternalSchemaFromCommitMetadataHandlesEmptyTimeline() {
+ // Given: An empty timeline
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+ HoodieTimeline timeline = mock(HoodieTimeline.class);
+
when(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+ when(timeline.getReverseOrderedInstants()).thenReturn(Stream.empty());
+
+ // When: Get internal schema from commit metadata
+ Option<org.apache.hudi.internal.schema.InternalSchema> result =
schemaResolver.getTableInternalSchemaFromCommitMetadata();
+
+ // Then: Should return empty for empty timeline
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testGetTableInternalSchemaFromCommitMetadataStopsAtFirstMatch() throws
IOException {
+ // Given: A timeline with multiple schema-updating operations
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
+ TableSchemaResolver schemaResolver = new TableSchemaResolver(metaClient);
+
+ HoodieInstant insertInstant1 = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"003", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+ HoodieInstant insertInstant2 = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
"002", InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+ // This instant should never be read due to short-circuit behavior
+ HoodieInstant insertInstant3 = mock(HoodieInstant.class);
+
+ HoodieCommitMetadata insertMetadata1 = new HoodieCommitMetadata();
+
insertMetadata1.setOperationType(org.apache.hudi.common.model.WriteOperationType.INSERT);
+ // Create a valid InternalSchema
+ org.apache.hudi.internal.schema.InternalSchema internalSchema = new
org.apache.hudi.internal.schema.InternalSchema(
+ org.apache.hudi.internal.schema.Types.RecordType.get(
+ org.apache.hudi.internal.schema.Types.Field.get(0, false, "id",
org.apache.hudi.internal.schema.Types.IntType.get())));
+
insertMetadata1.addMetadata(org.apache.hudi.internal.schema.utils.SerDeHelper.LATEST_SCHEMA,
+
org.apache.hudi.internal.schema.utils.SerDeHelper.toJson(internalSchema));
+
+ HoodieCommitMetadata insertMetadata2 = new HoodieCommitMetadata();
+
insertMetadata2.setOperationType(org.apache.hudi.common.model.WriteOperationType.INSERT);
+
+ HoodieTimeline timeline = mock(HoodieTimeline.class);
+
when(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()).thenReturn(timeline);
+ // Timeline in reverse order: 003, 002, 001
+
when(timeline.getReverseOrderedInstants()).thenReturn(Stream.of(insertInstant1,
insertInstant2, insertInstant3));
+
when(timeline.readCommitMetadata(insertInstant1)).thenReturn(insertMetadata1);
+
when(timeline.readCommitMetadata(insertInstant2)).thenReturn(insertMetadata2);
+ // Should not call readCommitMetadata for insertInstant3 due to
findFirst() short-circuit
+
+ // When: Get internal schema from commit metadata
+ Option<org.apache.hudi.internal.schema.InternalSchema> result =
schemaResolver.getTableInternalSchemaFromCommitMetadata();
+
+ // Then: Should find the first (most recent) schema-updating operation and
stop
+ assertTrue(result.isPresent());
+ // Verify that insertInstant3 was never interacted with (proving
short-circuit behavior)
+ verifyNoInteractions(insertInstant3);
+ }
}