hudi-agent commented on code in PR #18688:
URL: https://github.com/apache/hudi/pull/18688#discussion_r3188020047


##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -210,29 +208,29 @@ protected Pair<ClosableIterator<T>, HoodieSchema> 
getRecordsIterator(HoodieDataB
   /**
    * Get final Read Schema for support evolution.
    * step1: find the fileSchema for current dataBlock.
-   * step2: determine whether fileSchema is compatible with the final read 
internalSchema.
-   * step3: merge fileSchema and read internalSchema to produce final read 
schema.
+   * step2: determine whether fileSchema is compatible with the read evolution 
schema.
+   * step3: merge fileSchema and read evolution schema to produce final read 
schema.
    *
    * @param dataBlock current processed block
    * @return final read schema.
    */
   protected Option<Pair<Function<T, T>, HoodieSchema>> 
composeEvolvedSchemaTransformer(
       HoodieDataBlock dataBlock) {
-    if (internalSchema.isEmptySchema()) {
+    if (!evolutionSchemaOpt.isPresent()) {
       return Option.empty();
     }
 
     long currentInstantTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
-    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);
-    Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new 
InternalSchemaMerger(fileSchema, internalSchema,
+    HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);

Review Comment:
   🤖 Could `searchSchemaAndCache` return `null` here and NPE the merger? 
`HoodieSchemaSerDe.searchSchema` documents a `null` on miss, whereas the old 
`InternalSchemaUtils.searchSchema` returned 
`InternalSchema.getEmptyInternalSchema()` — so passing it straight into `new 
HoodieSchemaMerger(fileSchema, ...)` will NPE inside `buildRecordFields` 
(`fileSchema.findFullName(...)`) on tables where the data block's instant has 
no matching history entry. The same pattern exists at 
`FileGroupReaderSchemaHandler#getRequiredSchemaForFileAndRenamedColumns` (line 
141). Easiest fix is probably to have 
`HoodieSchemaHistoryCache.searchSchemaAndCache` return an empty-schema sentinel 
like the legacy code did.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -212,16 +201,13 @@ case class AlterTableCommand(table: CatalogTable, 
changes: Seq[TableChange], cha
     logInfo("table properties change finished")
   }
 
-  def getInternalSchemaAndHistorySchemaStr(sparkSession: SparkSession): 
(InternalSchema, String) = {
+  def getEvolutionSchemaAndHistorySchemaStr(sparkSession: SparkSession): 
(HoodieSchema, String) = {
     val path = AlterTableCommand.getTableLocation(table, sparkSession)
     val metaClient = HoodieTableMetaClient.builder().setBasePath(path)
       
.setConf(HadoopFSUtils.getStorageConf(sparkSession.sessionState.newHadoopConf())).build()
     val schemaUtil = new TableSchemaResolver(metaClient)
 
-    val schema = schemaUtil.getTableInternalSchemaFromCommitMetadata().orElse {
-      InternalSchemaConverter.convert(schemaUtil.getTableSchema)
-    }
-
+    val schema = 
schemaUtil.getTableEvolutionSchemaFromCommitMetadata().orElse(schemaUtil.getTableSchema)

Review Comment:
   🤖 Could you double-check the fallback when no evolution schema exists? 
Previously this branch wrapped `schemaUtil.getTableSchema` with 
`InternalSchemaConverter.convert(...)` which minted fresh field ids. The new 
direct call returns a `HoodieSchema` without ids, but the downstream 
`HoodieSchemaChangeApplier` operations (e.g. `nextColumnId`/`findIdByName` in 
add/delete/rename paths) appear to need ids to resolve columns. For a 
first-time ALTER on a table with no prior schema-on-read metadata, this looks 
like it would break or silently misbehave. @yihua could you verify whether ids 
need to be assigned here?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java:
##########
@@ -139,7 +139,9 @@ public static HoodieFileGroupReader<RowData> 
createFileGroupReader(
         .withFileSlice(fileSlice)
         .withDataSchema(tableSchema)
         .withRequestedSchema(requiredSchema)
-        
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
+        .withEvolutionSchema(schemaEvolutionManager.getQuerySchema() == null

Review Comment:
   🤖 nit: the previous code was a one-liner 
`Option.ofNullable(internalSchemaManager.getQuerySchema())` — could you keep 
that form instead of the explicit ternary? It also avoids calling 
`getQuerySchema()` twice.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/schema/evolution/TestHoodieSchemaHistoryStorageManager.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaIdAssigner;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests {@link HoodieSchemaHistoryStorageManager}: persisting evolution
+ * schemas to {@code .hoodie/.schema/}, reading them back, and the residual-
+ * file cleanup that runs on each persist. Each persist+read pair also
+ * exercises the {@link HoodieSchemaSerDe} JSON layout end-to-end.
+ */
+public class TestHoodieSchemaHistoryStorageManager extends 
HoodieCommonTestHarness {
+
+  private HoodieActiveTimeline timeline;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanMetaClient();
+  }
+
+  @Test
+  public void testPersistAndReadHistorySchemaStr() throws IOException {
+    timeline = TIMELINE_FACTORY.createActiveTimeline(metaClient);
+    HoodieSchemaHistoryStorageManager fm = new 
HoodieSchemaHistoryStorageManager(metaClient);
+
+    HoodieSchema currentSchema = simpleSchema(0L);
+    fm.persistHistorySchemaStr("0000", 
HoodieSchemaSerDe.inheritHistory(currentSchema, ""));
+    simulateCommit("0000");
+    metaClient.reloadActiveTimeline();
+    assertSameLogicalSchema(currentSchema, fm.getSchemaByKey("0"));
+
+    HoodieSchema secondSchema = simpleSchema(1L);
+    fm.persistHistorySchemaStr("0001", 
HoodieSchemaSerDe.inheritHistory(secondSchema, fm.getHistorySchemaStr()));
+    simulateCommit("0001");
+    metaClient.reloadActiveTimeline();
+    assertSameLogicalSchema(secondSchema, fm.getSchemaByKey("1"));
+
+    // Persist a schema for "0002" but never simulate the commit. The next 
persist
+    // should treat 0002.schemacommit as a residual file from a failed write 
and
+    // delete it as part of cleanResidualFiles.
+    HoodieSchema thirdSchema = simpleSchema(2L);
+    fm.persistHistorySchemaStr("0002", 
HoodieSchemaSerDe.inheritHistory(thirdSchema, fm.getHistorySchemaStr()));
+
+    HoodieSchema lastSchema = simpleSchema(3L);
+    fm.persistHistorySchemaStr("0004", 
HoodieSchemaSerDe.inheritHistory(lastSchema, fm.getHistorySchemaStr()));
+    simulateCommit("0004");
+    metaClient.reloadActiveTimeline();
+
+    File residual = new File(metaClient.getSchemaFolderName() + File.separator 
+ "0002.schemacommit");

Review Comment:
   🤖 nit: `assertTrue(!residual.exists(), ...)` reads more naturally as 
`assertFalse(residual.exists(), ...)`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java:
##########
@@ -96,7 +96,7 @@ public static ExternalSpillableMap<String, byte[]> 
spillableMap(
    *
    * @param metaClient            Hoodie metadata client
    * @param writeConfig           Hoodie write configuration
-   * @param internalSchemaManager Internal schema manager
+   * @param schemaEvolutionManager Internal schema manager

Review Comment:
   🤖 nit: stale description — after the rename the param is no longer an 
"Internal schema manager". Could you update this to "Schema-on-read evolution 
manager" (matching the wording used in `DataTableCompactHandler`)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -244,13 +230,13 @@ object AlterTableCommand extends Logging {
   /**
     * Generate an commit with new schema to change the table's schema.
     *
-    * @param internalSchema new schema after change
+    * @param evolutionSchema new schema after change
     * @param historySchemaStr history schemas
     * @param table The hoodie table.
     * @param sparkSession The spark session.
     */
-  def commitWithSchema(internalSchema: InternalSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {
-    val schema = InternalSchemaConverter.convert(internalSchema, 
HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table))
+  def commitWithSchema(evolutionSchema: HoodieSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {

Review Comment:
   🤖 nit: `val schema = evolutionSchema` is now a pointless alias since the 
`InternalSchemaConverter.convert(...)` call was removed — could you drop it and 
use `evolutionSchema` directly below?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkSchemaEvolutionConverter.java:
##########
@@ -212,25 +208,32 @@ private static void 
collectColNamesFromStructType(DataType sparkType, Deque<Stri
 
   private static void addFullName(DataType sparkType, String name, 
Deque<String> fieldNames, List<String> resultSet) {
     if (!(sparkType instanceof StructType) && !(sparkType instanceof 
ArrayType) && !(sparkType instanceof MapType)) {
-      resultSet.add(InternalSchemaUtils.createFullName(name, fieldNames));
+      if (fieldNames.isEmpty()) {
+        resultSet.add(name);
+        return;
+      }
+      StringBuilder sb = new StringBuilder();
+      fieldNames.descendingIterator().forEachRemaining(p -> 
sb.append(p).append('.'));
+      sb.append(name);
+      resultSet.add(sb.toString());
     }
   }
 
-  public static StructType mergeSchema(InternalSchema fileSchema, 
InternalSchema querySchema) {
-    InternalSchema schema = new InternalSchemaMerger(fileSchema, querySchema, 
true, true).mergeSchema();
-    return constructSparkSchemaFromInternalSchema(schema);
-  }
-
-  public static Map<Integer, Pair<DataType, DataType>> 
collectTypeChangedCols(InternalSchema schema, InternalSchema other) {
-    return InternalSchemaUtils
+  /**
+   * Returns a Spark-typed cast plan (oldType -&gt; newType) keyed by the 
column's
+   * index in {@code schema}. Used by the schema-on-read read path to wire up
+   * implicit promotion casts at row time.
+   */
+  public static Map<Integer, Pair<DataType, DataType>> 
collectTypeChangedCols(HoodieSchema schema, HoodieSchema other) {
+    return HoodieSchemaEvolutionUtils
         .collectTypeChangedCols(schema, other)
         .entrySet()
         .stream()
-        .collect(Collectors.toMap(e -> e.getKey(), e -> 
Pair.of(constructSparkSchemaFromType(e.getValue().getLeft()), 
constructSparkSchemaFromType(e.getValue().getRight()))));
-  }
-
-  public static StructType 
constructSparkSchemaFromInternalSchema(InternalSchema schema) {
-    return (StructType) constructSparkSchemaFromType(schema.getRecord());
+        .collect(Collectors.toMap(
+            e -> e.getKey(),
+            e -> Pair.of(

Review Comment:
   🤖 nit: could you add an import for `HoodieSchemaConversionUtils` instead of 
using the fully-qualified name? The rest of the file uses imports for 
everything else so this stands out.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -166,23 +163,23 @@ private Option<Function<HoodieRecord, HoodieRecord>> 
composeSchemaEvolutionTrans
                                                                                
          HoodieBaseFile baseFile,
                                                                                
          HoodieWriteConfig writeConfig,
                                                                                
          HoodieTableMetaClient metaClient) {
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(writeConfig.getInternalSchema());
+    Option<HoodieSchema> querySchemaOpt = 
HoodieSchemaSerDe.fromJson(writeConfig.getInternalSchema());
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
       // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(),
+      HoodieSchema querySchema = 
HoodieSchemaEvolutionUtils.reconcileSchema(writerSchema,
           querySchemaOpt.get(), 
writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
       long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
-      InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
-      if (fileSchema.isEmptySchema() && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
+      HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.getSchemaByVersionId(commitInstantTime, metaClient);
+      if ((fileSchema == null || fileSchema.isEmptySchema()) && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
         try {
-          fileSchema = 
InternalSchemaConverter.convert(tableSchemaResolver.getTableSchema(true));
+          fileSchema = tableSchemaResolver.getTableSchema(true);
         } catch (Exception e) {
-          throw new HoodieException(String.format("Failed to get 
InternalSchema for given versionId: %s", commitInstantTime), e);
+          throw new HoodieException(String.format("Failed to get evolution 
schema for given versionId: %s", commitInstantTime), e);
         }
       }
-      final InternalSchema writeInternalSchema = fileSchema;
+      final HoodieSchema writeInternalSchema = fileSchema;
       List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();

Review Comment:
   🤖 nit: this variable is now a `HoodieSchema` but the name still says 
`InternalSchema`. Could you rename to `writeFileSchema` (or similar) so the 
name matches the new type?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaHistoryCache.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
+import org.apache.hudi.common.table.timeline.InstantFileNameParser;
+import org.apache.hudi.common.table.timeline.InstantGenerator;
+import org.apache.hudi.common.table.timeline.TimelineLayout;
+import org.apache.hudi.common.table.timeline.TimelinePathProvider;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+/**
+ * Global, JVM-wide schema-history cache. Holds a per-table TreeMap of
+ * version-id → {@link HoodieSchema} so the read path can look up a schema by
+ * version without re-parsing the {@code .hoodie/.schema/} blob on every call.
+ *
+ * <p>This is the read-path equivalent of {@link 
HoodieSchemaHistoryStorageManager}:
+ * the storage manager owns disk I/O, the cache owns the in-memory map. A 
segment
+ * lock keyed by tablePath hash limits contention without serializing all
+ * lookups, and a Caffeine cache with {@code maximumSize=1000} and weakValues
+ * keeps memory bounded across many tables.
+ *
+ * <p>The Caffeine cache is JVM-singleton — every spark task in an executor
+ * shares one map per table. Schemas are evicted under memory pressure rather
+ * than time.
+ */
+@Slf4j
+public final class HoodieSchemaHistoryCache {
+
+  // Segment lock keyed by tablePath hash. 16 stripes is enough to keep most
+  // multi-table executors from contending on a single monitor while still
+  // bounding the lock overhead.
+  private static final Object[] LOCK_LIST = new Object[16];
+
+  static {
+    for (int i = 0; i < LOCK_LIST.length; i++) {
+      LOCK_LIST[i] = new Object();
+    }
+  }
+
+  // 1000-entry cap with weak values so the JVM can evict entries under memory 
pressure.
+  private static final Cache<String, TreeMap<Long, HoodieSchema>> 
HISTORICAL_SCHEMA_CACHE =
+      Caffeine.newBuilder().maximumSize(1000).weakValues().build();
+
+  private HoodieSchemaHistoryCache() {
+  }
+
+  /**

Review Comment:
   🤖 nit: the two branches here have identical bodies (re-read + put). Could 
you collapse them into a single `if` with `||` to remove the duplication?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1315,6 +1350,156 @@ public Object getProp(String key) {
   public void addProp(String key, Object value) {
     ValidationUtils.checkArgument(key != null && !key.isEmpty(), "Property key 
cannot be null or empty");
     avroSchema.addProp(key, value);
+    this.idIndex = null;
+  }
+
+  /**
+   * Factory for an "empty" schema sentinel: an unnamed record with no fields 
and an
+   * unset schemaId. Used by callers that need a placeholder when no evolution
+   * schema is available (e.g. schema-on-read disabled or no schema in commit
+   * metadata). Pair with {@link #isEmptySchema()} to detect the sentinel.
+   */
+  public static HoodieSchema empty() {
+    return createRecord("EmptySchema", null, "hudi", false, 
Collections.emptyList());
+  }
+
+  /**
+   * Returns true if this schema is the "empty" sentinel — i.e. has no schema 
id
+   * assigned (schemaId &lt; 0).
+   */
+  public boolean isEmptySchema() {
+    return schemaId < 0;
+  }
+
+  /**
+   * Returns the schema version id, or -1 if no version has been assigned.
+   */
+  public long schemaId() {
+    return schemaId;
+  }
+
+  /**
+   * Sets the schema version id. Typically derived from a commit instant 
timestamp.
+   * Mutable: callers that need to re-stamp the version (e.g. after an 
evolution
+   * operation) can call this multiple times.
+   */
+  public HoodieSchema setSchemaId(long schemaId) {
+    this.schemaId = schemaId;
+    return this;
+  }
+
+  /**
+   * Returns the highest column id assigned to any sub-schema. If an explicit
+   * max-column-id has been recorded (e.g. preserved across a column deletion),
+   * that value is returned; otherwise the highest id currently present in the
+   * schema is returned.
+   *
+   * <p>Returns -1 if no ids have been assigned anywhere in the schema.</p>
+   */
+  public int maxColumnId() {
+    if (explicitMaxColumnId >= 0) {
+      return explicitMaxColumnId;
+    }
+    return index().maxColumnIdSeen();
+  }
+
+  /**
+   * Records the highest column id explicitly. Useful after column deletions, 
where
+   * the highest currently-present id is less than the highest ever assigned 
and
+   * subsequent column additions must avoid colliding with previously-used ids.
+   */
+  public HoodieSchema setMaxColumnId(int maxColumnId) {
+    this.explicitMaxColumnId = maxColumnId;
+    return this;
+  }
+
+  /**
+   * Returns the dot-joined full name of the field that owns column id {@code 
id},
+   * or empty string if none.
+   */
+  public String findFullName(int id) {
+    String result = index().idToName().get(id);
+    return result == null ? "" : result;
+  }
+
+  /**
+   * Returns the column id assigned to the field at {@code fullName}, or -1 if 
not found.
+   */
+  public int findIdByName(String fullName) {
+    if (fullName == null || fullName.isEmpty()) {
+      return -1;
+    }
+    Integer id = index().nameToId().get(fullName);
+    return id == null ? -1 : id;
+  }
+
+  /**
+   * Returns all column ids in this schema.

Review Comment:
   🤖 nit: could you import 
`java.util.Set`/`java.util.List`/`java.util.ArrayList` (and use the unqualified 
names) for the new helpers below? The file uses unqualified `List`/`Map` etc. 
elsewhere — these stand out.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java:
##########
@@ -96,7 +94,15 @@ public class SchemaEvolutionContext {
   private final InputSplit split;
   private final JobConf job;
   private final HoodieTableMetaClient metaClient;
-  public Option<InternalSchema> internalSchemaOption;
+  private Option<HoodieSchema> evolutionSchemaOption;

Review Comment:
   🤖 nit: could you initialize `evolutionSchemaOption` to `Option.empty()` at 
declaration? Then `getEvolutionSchemaOption()` doesn't need the null-vs-empty 
branch, and callers can't observe a transient null.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -791,20 +796,21 @@ object HoodieBaseRelation extends SparkAdapterSupport {
   }
 
   /**
-   * Projects provided schema by picking only required (projected) top-level 
columns from it
+   * Projects provided schema by picking only required (projected) top-level 
columns from it.
    *
-   * @param tableSchema schema to project (either of [[InternalSchema]] or 
Avro's [[Schema]])
+   * @param tableSchema     Right is the schema-on-read evolution schema (with 
field ids);
+   *                        Left is the structural HoodieSchema fallback.
    * @param requiredColumns required top-level columns to be projected
    */
-  def projectSchema(tableSchema: Either[HoodieSchema, InternalSchema], 
requiredColumns: Array[String]): (HoodieSchema, StructType, InternalSchema) = {
+  def projectSchema(tableSchema: Either[HoodieSchema, HoodieSchema], 
requiredColumns: Array[String]): (HoodieSchema, StructType, 
Option[HoodieSchema]) = {

Review Comment:
   🤖 nit: `Either[HoodieSchema, HoodieSchema]` is hard to read — both sides are 
the same type, so the Left/Right distinction only lives in the Scaladoc. Could 
you take two separate args (e.g. `evolutionSchema: Option[HoodieSchema]`, 
`fallbackSchema: HoodieSchema`) instead?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to