hudi-agent commented on code in PR #18688:
URL: https://github.com/apache/hudi/pull/18688#discussion_r3182069113


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -196,14 +193,13 @@ private Option<Function<HoodieRecord, HoodieRecord>> 
composeSchemaEvolutionTrans
                 && 
Objects.equals(writeInternalSchema.findType(writerSchemaFieldId), 
querySchema.findType(querySchemaFieldId));
           })
           .collect(Collectors.toList());
-      InternalSchema mergedSchema = new 
InternalSchemaMerger(writeInternalSchema, querySchema,
+      HoodieSchema newWriterSchema = new 
HoodieSchemaMerger(writeInternalSchema, querySchema,

Review Comment:
   πŸ€– The record name on `newWriterSchema` changes here: previously it was 
`writerSchema.getFullName()` (via 
`InternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName())`); 
now `HoodieSchemaMerger.mergeSchema()` stamps `querySchema.getFullName()` 
instead. And `writeSchemaFromFile` is no longer renormalized to share that 
name. Could the resulting record-name mismatch alter the result of 
`HoodieSchemaCompatibility.areSchemasCompatible(newWriterSchema, 
writeSchemaFromFile)` on line 199, or affect what 
`record.rewriteRecordWithNewSchema(..., newWriterSchema, ...)` produces on line 
207?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -166,23 +163,23 @@ private Option<Function<HoodieRecord, HoodieRecord>> 
composeSchemaEvolutionTrans
                                                                                
          HoodieBaseFile baseFile,
                                                                                
          HoodieWriteConfig writeConfig,
                                                                                
          HoodieTableMetaClient metaClient) {
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(writeConfig.getInternalSchema());
+    Option<HoodieSchema> querySchemaOpt = 
HoodieSchemaSerDe.fromJson(writeConfig.getInternalSchema());
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
       // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(),
+      HoodieSchema querySchema = 
HoodieSchemaEvolutionUtils.reconcileSchema(writerSchema,
           querySchemaOpt.get(), 
writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
       long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
-      InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
-      if (fileSchema.isEmptySchema() && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
+      HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.getSchemaByVersionId(commitInstantTime, metaClient);
+      if ((fileSchema == null || fileSchema.isEmptySchema()) && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
         try {
-          fileSchema = 
InternalSchemaConverter.convert(tableSchemaResolver.getTableSchema(true));
+          fileSchema = tableSchemaResolver.getTableSchema(true);

Review Comment:
   πŸ€– Previously this was wrapped in `InternalSchemaConverter.convert(...)`, 
which minted fresh field ids onto the parsed table schema. With the wrap 
removed, `fileSchema` here is whatever `getTableSchema(true)` returns directly. 
If the SCHEMA_KEY blob in commit metadata doesn't carry `field-id` properties 
(older tables, or tables that haven't been through the new id-stamping save 
path), then `writeInternalSchema.findIdByName(...)` on line 187 will return -1 
for every column and `sameCols` becomes empty β€” flipping `needToReWriteRecord` 
true even when nothing actually changed. Is that a path you've already verified 
is safe?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaInternalSchemaBridge.java:
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.schema.types.Type;
+import org.apache.hudi.common.schema.types.Types;
+import org.apache.hudi.common.schema.evolution.legacy.InternalSchema;
+import 
org.apache.hudi.common.schema.evolution.legacy.convert.InternalSchemaConverter;
+import 
org.apache.hudi.common.schema.evolution.legacy.utils.InternalSchemaUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * One-way bridge from {@link InternalSchema} to {@link HoodieSchema} that 
preserves
+ * column ids by stamping them as Avro custom properties on the HoodieSchema's
+ * underlying schema tree.
+ *
+ * <p>This exists during the InternalSchema β†’ HoodieSchema migration. The 
existing
+ * {@link InternalSchemaConverter#convert(InternalSchema, String)} produces a
+ * structurally-correct HoodieSchema but discards field ids. Downstream code in
+ * the new evolution layer relies on {@code field-id} / {@code element-id} /
+ * {@code key-id} / {@code value-id} properties being present, so we walk the
+ * InternalSchema and stamped HoodieSchema in lock-step and copy ids over.</p>
+ *
+ * <p>The walk order matches {@code 
InternalSchemaConverter.visitInternalSchemaToBuildHoodieSchema}
+ * (record fields in declared order; array element after array; map key + 
value after map),
+ * so positional pairing is exact.</p>
+ *
+ * <p>Public for the migration period only β€” Phase 4 callsite migrations across
+ * different packages need access to the conversion. Once Phase 5 rewrites the
+ * action algebra on pure HoodieSchema, this bridge and its dependency on
+ * {@code InternalSchema} go away.</p>
+ */
+public final class HoodieSchemaInternalSchemaBridge {
+
+  private HoodieSchemaInternalSchemaBridge() {
+  }
+
+  /**
+   * Converts a {@link HoodieSchema} to an {@link InternalSchema}, preserving 
column
+   * ids carried as {@code field-id} / {@code element-id} / {@code key-id} /
+   * {@code value-id} Avro custom properties. This is the inverse of
+   * {@link #toHoodieSchema(InternalSchema, String)} and exists so the faΓ§ade 
can
+   * round-trip a HoodieSchema through the legacy applier without renumbering 
ids on
+   * every call.
+   *
+   * <p>For HoodieSchemas that have not yet had ids assigned (e.g. freshly 
parsed
+   * input), this falls back to the existing
+   * {@link InternalSchemaConverter#convert(HoodieSchema)} which mints fresh 
ids.</p>
+   */
+  public static InternalSchema toInternalSchema(HoodieSchema hoodieSchema) {
+    // Short-circuit only on the genuine empty sentinel β€” a record with no 
fields.
+    // {@link HoodieSchema#isEmptySchema()} is schemaId-based and would 
mis-classify
+    // any freshly-built HoodieSchema (default schemaId=-1) as empty, even 
when it
+    // carries real fields, causing the round-trip to silently lose them.
+    if (hoodieSchema == null
+        || hoodieSchema.getType() != HoodieSchemaType.RECORD

Review Comment:
   πŸ€– This short-circuits to the empty sentinel for any non-RECORD top-level 
schema, including a top-level nullable record (UNION wrapping a RECORD) β€” 
`getType()` would return UNION, not RECORD. Other code in this PR 
(`HoodieSchemaIdAssigner.visit`, `HoodieSchemaIndex.walk`, the `reidentify` 
helper just below) consistently does `effective = isNullable() ? 
getNonNullType() : schema` before the type check. Worth unwrapping nullability 
here too so a nullable top-level record isn't silently mapped to empty?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1733,18 +1749,26 @@ public void commitTableChange(InternalSchema newSchema, 
HoodieTableMetaClient me
     } catch (HoodieIOException io) {
       throw new HoodieCommitException("Failed to commit " + instantTime + " 
unable to save inflight metadata ", io);
     }
+    newEvolutionSchema.setSchemaId(Long.parseLong(instantTime));
     Map<String, String> extraMeta = new HashMap<>();
-    extraMeta.put(SerDeHelper.LATEST_SCHEMA, 
SerDeHelper.toJson(newSchema.setSchemaId(Long.parseLong(instantTime))));
+    extraMeta.put(HoodieSchemaSerDe.LATEST_SCHEMA, 
HoodieSchemaSerDe.toJson(newEvolutionSchema));
     // try to save history schemas
-    FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(metaClient);
-    schemasManager.persistHistorySchemaStr(instantTime, 
SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
+    HoodieSchemaHistoryStorageManager schemasManager = new 
HoodieSchemaHistoryStorageManager(metaClient);
+    schemasManager.persistHistorySchemaStr(instantTime, 
HoodieSchemaSerDe.inheritHistory(newEvolutionSchema, historySchemaStr));
     commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), 
commitActionType);

Review Comment:
   πŸ€– Previously the schema written into config went through 
`InternalSchemaConverter.convert(newSchema, 
HoodieSchemaUtils.getRecordQualifiedName(config.getTableName()))`, which forced 
the record name to the canonical `hoodie.<table>.<table>_record` form. The new 
path passes `newEvolutionSchema.toString()` directly, whose record name is 
whatever the change applier carried over from the prior evolution schema. Could 
that diverge from the qualified name and affect downstream consumers of 
`config.getSchema()` (e.g. Avro reader/writer record-name matching)?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaIdAssigner.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import org.apache.avro.Schema;
+
+import static org.apache.hudi.common.schema.HoodieSchema.ELEMENT_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.FIELD_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.KEY_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.VALUE_ID_PROP;
+
+/**
+ * Walks a {@link HoodieSchema} and assigns sequential integer ids to every 
addressable
+ * sub-schema (record fields, array elements, map keys and values), writing 
the result
+ * onto the underlying Avro {@link Schema}/{@link Schema.Field} as custom JSON 
properties.
+ *
+ * <p>The traversal order mirrors {@link 
org.apache.hudi.common.schema.evolution.legacy.InternalSchemaBuilder}
+ * (record fields in declared order; array element id assigned before 
recursing into the
+ * element type; map key id then value id assigned before recursing into the 
value type)
+ * so id assignments are stable across the InternalSchema β†’ HoodieSchema 
migration and
+ * round-tripping a previously-IDed InternalSchema through HoodieSchema 
produces the same
+ * id mapping.</p>
+ *
+ * <p>Existing ids are preserved: if a node already has an id property, this 
assigner uses
+ * that id and bumps {@code nextId} past it, so reapplying the assigner to a 
partially
+ * IDed schema is idempotent. {@link #assign(HoodieSchema, int)} returns the 
new
+ * {@code maxColumnId} so callers can persist it on the schema root.</p>
+ */
+public final class HoodieSchemaIdAssigner {
+
+  private HoodieSchemaIdAssigner() {
+  }
+
+  /**
+   * Assigns ids onto {@code schema} starting from {@code startId} and writes 
the resulting
+   * {@code max-column-id} onto the schema root.
+   *
+   * @return the maximum column id present after assignment
+   */
+  public static int assign(HoodieSchema schema, int startId) {
+    int[] nextId = {startId};
+    visit(schema, nextId);
+    int maxId = nextId[0] - 1;
+    schema.setMaxColumnId(maxId);
+    return maxId;
+  }
+
+  /**
+   * Assigns ids onto {@code schema}, starting after the highest id already 
present
+   * (or 0 if none). This is the entry point for first-time id assignment on a 
schema
+   * that has no prior IDs.
+   */
+  public static int assignFresh(HoodieSchema schema) {
+    HoodieSchemaIndex index = HoodieSchemaIndex.of(schema);
+    int start = Math.max(0, index.maxColumnIdSeen() + 1);
+    return assign(schema, start);
+  }
+
+  private static void visit(HoodieSchema schema, int[] nextId) {
+    HoodieSchema effective = schema.isNullable() ? schema.getNonNullType() : 
schema;
+    switch (effective.getType()) {
+      case RECORD:
+        // Mirror InternalSchemaConverter ordering: assign ids to all fields 
at this level
+        // before recursing into any of them.
+        for (HoodieSchemaField field : effective.getFields()) {
+          assignFieldId(field, nextId);
+        }
+        for (HoodieSchemaField field : effective.getFields()) {
+          visit(field.schema(), nextId);
+        }
+        return;
+      case ARRAY: {
+        Schema arr = effective.getAvroSchema();
+        assignContainerId(arr, ELEMENT_ID_PROP, nextId);
+        visit(effective.getElementType(), nextId);
+        return;
+      }
+      case MAP: {
+        Schema map = effective.getAvroSchema();
+        assignContainerId(map, KEY_ID_PROP, nextId);
+        assignContainerId(map, VALUE_ID_PROP, nextId);
+        visit(effective.getValueType(), nextId);
+        return;
+      }
+      default:
+        // primitive: nothing to assign
+    }
+  }
+
+  private static void assignFieldId(HoodieSchemaField field, int[] nextId) {
+    Schema.Field avroField = field.getAvroField();
+    Object existing = avroField.getObjectProp(FIELD_ID_PROP);
+    if (existing instanceof Number) {
+      int existingId = ((Number) existing).intValue();
+      if (existingId >= nextId[0]) {
+        nextId[0] = existingId + 1;
+      }
+      return;
+    }
+    avroField.addProp(FIELD_ID_PROP, nextId[0]++);

Review Comment:
   πŸ€– I think `assign(schema, startId)` can produce id collisions if `startId` 
is not strictly greater than every existing id in the schema. Example: fields 
[A(id=0), B(no id), C(id=1)] with startId=0 β€” A bumps nextId to 1; B is 
assigned nextId=1; C sees existingId=1 < nextId=2 and skips, so B and C both 
end up with id 1. `assignFresh()` is safe because it precomputes `max+1`, but 
the public `assign()` doc claims existing-id preservation/idempotency without 
flagging this precondition. Worth either pre-scanning all existing ids (e.g., 
via `HoodieSchemaIndex`) before assignment or hardening the doc?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java:
##########
@@ -667,23 +665,22 @@ private Pair<ClosableIterator<HoodieRecord>, 
HoodieSchema> getRecordsIterator(
   /**
    * Get final Read Schema for support evolution.
    * step1: find the fileSchema for current dataBlock.
-   * step2: determine whether fileSchema is compatible with the final read 
internalSchema.
-   * step3: merge fileSchema and read internalSchema to produce final read 
schema.
+   * step2: determine whether fileSchema is compatible with the final read 
evolution schema.
+   * step3: merge fileSchema and read evolution schema to produce final read 
schema.
    *
    * @param dataBlock current processed block
    * @return final read schema.
    */
   private Option<Pair<Function<HoodieRecord, HoodieRecord>, HoodieSchema>> 
composeEvolvedSchemaTransformer(
       HoodieDataBlock dataBlock) {
-    if (internalSchema.isEmptySchema()) {
+    if (evolutionSchema.isEmptySchema()) {
       return Option.empty();
     }
 
     long currentInstantTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
-    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);
-    InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
internalSchema,
+    HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);

Review Comment:
   πŸ€– Could `searchSchemaAndCache` returning `null` here NPE the merger? 
`HoodieSchemaSerDe.searchSchema` explicitly documents it returns null on miss 
(vs the legacy `InternalSchemaCache.searchSchemaAndCache` which returned an 
empty-schema sentinel). The merger immediately calls 
`fileSchema.findFullName(fieldId)` / `fileSchema.findType(...)` without a null 
check. Same pattern in `FileGroupReaderSchemaHandler.java:141` and 
`FileGroupRecordBuffer.java:224`. @yihua could you weigh in on the intended 
contract here?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -72,116 +65,112 @@ case class AlterTableCommand(table: CatalogTable, 
changes: Seq[TableChange], cha
     // convert to delete first then add again
     val deleteChanges = changes.filter(p => 
p.isInstanceOf[DeleteColumn]).map(_.asInstanceOf[DeleteColumn])
     val addChanges = changes.filter(p => 
p.isInstanceOf[AddColumn]).map(_.asInstanceOf[AddColumn])
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyAddAction2Schema(sparkSession, 
applyDeleteAction2Schema(sparkSession, oldSchema, deleteChanges), addChanges)
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column replace finished")
   }
 
-  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, addChanges: Seq[AddColumn]): InternalSchema = {
-    val addChange = TableChanges.ColumnAddChange.get(oldSchema)
+  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, addChanges: Seq[AddColumn]): HoodieSchema = {
+    var cur = oldSchema
     addChanges.foreach { addColumn =>
       val names = addColumn.fieldNames()
       val parentName = AlterTableCommand.getParentName(names)
-      // add col change
-      val colType = 
SparkInternalSchemaConverter.buildTypeFromStructType(addColumn.dataType(), 
true, new AtomicInteger(0))
-      addChange.addColumns(parentName, names.last, colType, 
addColumn.comment())
-      // add position change
-      addColumn.position() match {
+      val fullName = names.mkString(".")
+      val colType = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(addColumn.dataType(),
 names.last)
+      val (positionType, positionRef) = addColumn.position() match {
         case after: TableChange.After =>
-          addChange.addPositionChange(names.mkString("."),
-            if (parentName.isEmpty) after.column() else parentName + "." + 
after.column(), "after")
+          (ColumnPositionType.AFTER, if (parentName.isEmpty) after.column() 
else parentName + "." + after.column())
         case _: TableChange.First =>
-          addChange.addPositionChange(names.mkString("."), "", "first")
+          (ColumnPositionType.FIRST, "")
         case _ =>
+          (ColumnPositionType.NO_OPERATION, "")
       }
+      cur = new HoodieSchemaChangeApplier(cur).applyAddChange(fullName, 
colType, addColumn.comment(), positionRef, positionType)
     }
-    SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange)
+    cur
   }
 
-  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = {
-    val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema)
-    deleteChanges.foreach { c =>
-      val originalColName = c.fieldNames().mkString(".")
-      checkSchemaChange(Seq(originalColName), table)
-      deleteChange.deleteColumn(originalColName)
+  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, deleteChanges: Seq[DeleteColumn]): HoodieSchema = {
+    val colNames = deleteChanges.map { c =>
+      val name = c.fieldNames().mkString(".")
+      checkSchemaChange(Seq(name), table)
+      name
+    }.toArray
+    if (colNames.isEmpty) {
+      oldSchema
+    } else {
+      val newSchema = new 
HoodieSchemaChangeApplier(oldSchema).applyDeleteChange(colNames: _*)
+      // delete action should not change the getMaxColumnId field
+      newSchema.setMaxColumnId(oldSchema.maxColumnId())
+      newSchema
     }
-    val newSchema = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, 
deleteChange)
-    // delete action should not change the getMaxColumnId field
-    newSchema.setMaxColumnId(oldSchema.getMaxColumnId)
-    newSchema
   }
 
 
   def applyAddAction(sparkSession: SparkSession): Unit = {
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyAddAction2Schema(sparkSession, oldSchema, 
changes.map(_.asInstanceOf[AddColumn]))
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column add finished")
   }
 
   def applyDeleteAction(sparkSession: SparkSession): Unit = {
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyDeleteAction2Schema(sparkSession, oldSchema, 
changes.map(_.asInstanceOf[DeleteColumn]))
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column delete finished")
   }
 
   def applyUpdateAction(sparkSession: SparkSession): Unit = {
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
-    val updateChange = TableChanges.ColumnUpdateChange.get(oldSchema)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
+    var cur = oldSchema
     changes.foreach { change =>
-      change match {
+      val applier = new HoodieSchemaChangeApplier(cur)
+      cur = change match {
         case updateType: TableChange.UpdateColumnType =>
-          val newType = 
SparkInternalSchemaConverter.buildTypeFromStructType(updateType.newDataType(), 
true, new AtomicInteger(0))
-          updateChange.updateColumnType(updateType.fieldNames().mkString("."), 
newType)
+          val newType = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(updateType.newDataType(),
 updateType.fieldNames().last)
+          applier.applyColumnTypeChange(updateType.fieldNames().mkString("."), 
newType)
         case updateComment: TableChange.UpdateColumnComment =>
-          
updateChange.updateColumnComment(updateComment.fieldNames().mkString("."), 
updateComment.newComment())
+          
applier.applyColumnCommentChange(updateComment.fieldNames().mkString("."), 
updateComment.newComment())
         case updateName: TableChange.RenameColumn =>
           val originalColName = updateName.fieldNames().mkString(".")
           checkSchemaChange(Seq(originalColName), table)
-          updateChange.renameColumn(originalColName, updateName.newName())
+          applier.applyRenameChange(originalColName, updateName.newName())
         case updateNullAbility: TableChange.UpdateColumnNullability =>
-          
updateChange.updateColumnNullability(updateNullAbility.fieldNames().mkString("."),
 updateNullAbility.nullable())
+          
applier.applyColumnNullabilityChange(updateNullAbility.fieldNames().mkString("."),
 updateNullAbility.nullable())
         case updatePosition: TableChange.UpdateColumnPosition =>
           val names = updatePosition.fieldNames()
           val parentName = AlterTableCommand.getParentName(names)
+          val fullName = names.mkString(".")
           updatePosition.position() match {
             case after: TableChange.After =>
-              updateChange.addPositionChange(names.mkString("."),
-                if (parentName.isEmpty) after.column() else parentName + "." + 
after.column(), "after")
+              val refCol = if (parentName.isEmpty) after.column() else 
parentName + "." + after.column()
+              applier.applyReOrderColPositionChange(fullName, refCol, 
ColumnPositionType.AFTER)
             case _: TableChange.First =>
-              updateChange.addPositionChange(names.mkString("."), "", "first")
-            case _ =>
+              applier.applyReOrderColPositionChange(fullName, "", 
ColumnPositionType.FIRST)
+            case _ => cur
           }
+        case _ => cur

Review Comment:
   πŸ€– Adding `case _ => cur` here silently swallows any TableChange type not 
enumerated above (whereas the old `change match { ... }` was a non-exhaustive 
statement that would throw `MatchError` for unknown types). Is this 
intentional? It might mask future Spark-added change types (e.g. 
`UpdateColumnDefaultValue`) β€” the user would think the ALTER succeeded while no 
schema change actually got persisted. Could we throw an explicit 
`UnsupportedOperationException` instead?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -244,13 +230,13 @@ object AlterTableCommand extends Logging {
   /**
     * Generate an commit with new schema to change the table's schema.
     *
-    * @param internalSchema new schema after change
+    * @param evolutionSchema new schema after change
     * @param historySchemaStr history schemas
     * @param table The hoodie table.
     * @param sparkSession The spark session.
     */
-  def commitWithSchema(internalSchema: InternalSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {
-    val schema = InternalSchemaConverter.convert(internalSchema, 
HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table))
+  def commitWithSchema(evolutionSchema: HoodieSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {
+    val schema = evolutionSchema
     val path = getTableLocation(table, sparkSession)
     val jsc = new JavaSparkContext(sparkSession.sparkContext)
     val client = DataSourceUtils.createHoodieClient(

Review Comment:
   πŸ€– [Line 244] The old code passed 
`InternalSchemaConverter.convert(internalSchema, 
HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table))` so the Avro 
schema string used here always carried the catalog-driven record name 
`hoodie.<table>.<table>_record`. Now `schema.toString` falls back to whatever 
record name `evolutionSchema`'s underlying Avro schema already had β€” could you 
confirm this still matches for tables where the catalog name differs from the 
persisted schema's record name (e.g. tables created/renamed externally)? A 
mismatch here would propagate into the write client config and could affect 
schema validation / Hive sync.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -72,116 +65,112 @@ case class AlterTableCommand(table: CatalogTable, 
changes: Seq[TableChange], cha
     // convert to delete first then add again
     val deleteChanges = changes.filter(p => 
p.isInstanceOf[DeleteColumn]).map(_.asInstanceOf[DeleteColumn])
     val addChanges = changes.filter(p => 
p.isInstanceOf[AddColumn]).map(_.asInstanceOf[AddColumn])
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyAddAction2Schema(sparkSession, 
applyDeleteAction2Schema(sparkSession, oldSchema, deleteChanges), addChanges)
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column replace finished")
   }
 
-  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, addChanges: Seq[AddColumn]): InternalSchema = {
-    val addChange = TableChanges.ColumnAddChange.get(oldSchema)
+  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, addChanges: Seq[AddColumn]): HoodieSchema = {
+    var cur = oldSchema
     addChanges.foreach { addColumn =>
       val names = addColumn.fieldNames()
       val parentName = AlterTableCommand.getParentName(names)
-      // add col change
-      val colType = 
SparkInternalSchemaConverter.buildTypeFromStructType(addColumn.dataType(), 
true, new AtomicInteger(0))
-      addChange.addColumns(parentName, names.last, colType, 
addColumn.comment())
-      // add position change
-      addColumn.position() match {
+      val fullName = names.mkString(".")
+      val colType = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(addColumn.dataType(),
 names.last)
+      val (positionType, positionRef) = addColumn.position() match {
         case after: TableChange.After =>
-          addChange.addPositionChange(names.mkString("."),
-            if (parentName.isEmpty) after.column() else parentName + "." + 
after.column(), "after")
+          (ColumnPositionType.AFTER, if (parentName.isEmpty) after.column() 
else parentName + "." + after.column())
         case _: TableChange.First =>
-          addChange.addPositionChange(names.mkString("."), "", "first")
+          (ColumnPositionType.FIRST, "")
         case _ =>
+          (ColumnPositionType.NO_OPERATION, "")
       }
+      cur = new HoodieSchemaChangeApplier(cur).applyAddChange(fullName, 
colType, addColumn.comment(), positionRef, positionType)
     }
-    SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange)
+    cur
   }
 
-  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = {
-    val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema)
-    deleteChanges.foreach { c =>
-      val originalColName = c.fieldNames().mkString(".")
-      checkSchemaChange(Seq(originalColName), table)
-      deleteChange.deleteColumn(originalColName)
+  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, deleteChanges: Seq[DeleteColumn]): HoodieSchema = {
+    val colNames = deleteChanges.map { c =>
+      val name = c.fieldNames().mkString(".")
+      checkSchemaChange(Seq(name), table)
+      name
+    }.toArray
+    if (colNames.isEmpty) {
+      oldSchema
+    } else {
+      val newSchema = new 
HoodieSchemaChangeApplier(oldSchema).applyDeleteChange(colNames: _*)
+      // delete action should not change the getMaxColumnId field
+      newSchema.setMaxColumnId(oldSchema.maxColumnId())
+      newSchema
     }
-    val newSchema = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, 
deleteChange)
-    // delete action should not change the getMaxColumnId field
-    newSchema.setMaxColumnId(oldSchema.getMaxColumnId)
-    newSchema
   }
 
 
   def applyAddAction(sparkSession: SparkSession): Unit = {
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyAddAction2Schema(sparkSession, oldSchema, 
changes.map(_.asInstanceOf[AddColumn]))
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column add finished")
   }
 
   def applyDeleteAction(sparkSession: SparkSession): Unit = {
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyDeleteAction2Schema(sparkSession, oldSchema, 
changes.map(_.asInstanceOf[DeleteColumn]))
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column delete finished")
   }
 
   def applyUpdateAction(sparkSession: SparkSession): Unit = {
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
-    val updateChange = TableChanges.ColumnUpdateChange.get(oldSchema)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
+    var cur = oldSchema
     changes.foreach { change =>
-      change match {
+      val applier = new HoodieSchemaChangeApplier(cur)

Review Comment:
   πŸ€– The semantics here have shifted from accumulate-then-apply (one 
`TableChanges.ColumnUpdateChange` carrying every change, applied at the end) to 
sequential-per-change (each iteration creates a fresh 
`HoodieSchemaChangeApplier` over `cur`). Is this tested for cases where changes 
within one DDL refer to each other β€” e.g. `RENAME COLUMN a TO b` followed by 
`ALTER COLUMN a TYPE INT` in the same statement? Under the new ordering the 
type change would see column `a` already renamed and could fail to resolve.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark33LegacyHoodieParquetFileFormat.scala:
##########
@@ -220,8 +219,8 @@ class Spark33LegacyHoodieParquetFileFormat(private val 
shouldAppendPartitionValu
       // Clone new conf
       val hadoopAttemptConf = new 
Configuration(broadcastedHadoopConf.value.value)
       val typeChangeInfos: java.util.Map[Integer, Pair[DataType, DataType]] = 
if (shouldUseInternalSchema) {
-        val mergedInternalSchema = new InternalSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()
-        val mergedSchema = 
SparkInternalSchemaConverter.constructSparkSchemaFromInternalSchema(mergedInternalSchema)
+        val mergedInternalSchema = new HoodieSchemaMerger(fileSchema, 
querySchemaOption.get(), true, true).mergeSchema()

Review Comment:
   πŸ€– nit: `mergedInternalSchema` still carries the old `InternalSchema` naming 
even though it now holds a `HoodieSchema`. Since the whole PR is retiring the 
`InternalSchema` concept, could you rename it to `mergedHoodieSchema` (or 
similar) here and in the 34/35/40/41 equivalents for consistency?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java:
##########
@@ -139,7 +139,9 @@ public static HoodieFileGroupReader<RowData> 
createFileGroupReader(
         .withFileSlice(fileSlice)
         .withDataSchema(tableSchema)
         .withRequestedSchema(requiredSchema)
-        
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
+        .withEvolutionSchema(internalSchemaManager.getQuerySchema() == null

Review Comment:
   πŸ€– nit: the original code used 
`Option.ofNullable(internalSchemaManager.getQuerySchema())` β€” a clean 
one-liner. The new ternary expands that into 3 lines and calls 
`getQuerySchema()` twice. Could you restore the simpler form: 
`.withEvolutionSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))`?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -791,20 +796,21 @@ object HoodieBaseRelation extends SparkAdapterSupport {
   }
 
   /**
-   * Projects provided schema by picking only required (projected) top-level 
columns from it
+   * Projects provided schema by picking only required (projected) top-level 
columns from it.
    *
-   * @param tableSchema schema to project (either of [[InternalSchema]] or 
Avro's [[Schema]])
+   * @param tableSchema     Right is the schema-on-read evolution schema (with 
field ids);
+   *                        Left is the structural HoodieSchema fallback.
    * @param requiredColumns required top-level columns to be projected
    */
-  def projectSchema(tableSchema: Either[HoodieSchema, InternalSchema], 
requiredColumns: Array[String]): (HoodieSchema, StructType, InternalSchema) = {
+  def projectSchema(tableSchema: Either[HoodieSchema, HoodieSchema], 
requiredColumns: Array[String]): (HoodieSchema, StructType, 
Option[HoodieSchema]) = {

Review Comment:
   πŸ€– nit: `Either[HoodieSchema, HoodieSchema]` with the same type on both arms 
loses the self-documenting property the old `Either[HoodieSchema, 
InternalSchema]` had β€” a reader seeing `Left` vs `Right` at a call site now has 
no type-level hint which is which. Have you considered using 
`Option[HoodieSchema]` (present = evolution schema, absent = fall back to 
structural) so the semantics are clear from the type alone?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/schema/evolution/TestHoodieSchemaHistoryStorageManager.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaIdAssigner;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.TIMELINE_FACTORY;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests {@link HoodieSchemaHistoryStorageManager}: persisting evolution
+ * schemas to {@code .hoodie/.schema/}, reading them back, and the residual-
+ * file cleanup that runs on each persist. Each persist+read pair also
+ * exercises the {@link HoodieSchemaSerDe} JSON layout end-to-end.
+ */
+public class TestHoodieSchemaHistoryStorageManager extends 
HoodieCommonTestHarness {
+
+  private HoodieActiveTimeline timeline;
+
+  @BeforeEach
+  public void setUp() throws Exception {
+    initMetaClient();
+  }
+
+  @AfterEach
+  public void tearDown() throws Exception {
+    cleanMetaClient();
+  }
+
+  @Test
+  public void testPersistAndReadHistorySchemaStr() throws IOException {
+    timeline = TIMELINE_FACTORY.createActiveTimeline(metaClient);
+    HoodieSchemaHistoryStorageManager fm = new 
HoodieSchemaHistoryStorageManager(metaClient);
+
+    HoodieSchema currentSchema = simpleSchema(0L);
+    fm.persistHistorySchemaStr("0000", 
HoodieSchemaSerDe.inheritHistory(currentSchema, ""));
+    simulateCommit("0000");
+    metaClient.reloadActiveTimeline();
+    assertSameLogicalSchema(currentSchema, fm.getSchemaByKey("0"));
+
+    HoodieSchema secondSchema = simpleSchema(1L);
+    fm.persistHistorySchemaStr("0001", 
HoodieSchemaSerDe.inheritHistory(secondSchema, fm.getHistorySchemaStr()));
+    simulateCommit("0001");
+    metaClient.reloadActiveTimeline();
+    assertSameLogicalSchema(secondSchema, fm.getSchemaByKey("1"));
+
+    // Persist a schema for "0002" but never simulate the commit. The next 
persist
+    // should treat 0002.schemacommit as a residual file from a failed write 
and
+    // delete it as part of cleanResidualFiles.
+    HoodieSchema thirdSchema = simpleSchema(2L);
+    fm.persistHistorySchemaStr("0002", 
HoodieSchemaSerDe.inheritHistory(thirdSchema, fm.getHistorySchemaStr()));
+
+    HoodieSchema lastSchema = simpleSchema(3L);
+    fm.persistHistorySchemaStr("0004", 
HoodieSchemaSerDe.inheritHistory(lastSchema, fm.getHistorySchemaStr()));
+    simulateCommit("0004");
+    metaClient.reloadActiveTimeline();
+
+    File residual = new File(metaClient.getSchemaFolderName() + File.separator 
+ "0002.schemacommit");
+    assertTrue(!residual.exists(), "residual schema file from failed persist 
should have been cleaned up");
+
+    assertSameLogicalSchema(currentSchema, fm.getSchemaByKey("0"));

Review Comment:
   πŸ€– nit: `assertTrue(!residual.exists(), ...)` reads a little awkwardly with 
the negation. `assertFalse(residual.exists(), "residual schema file from failed 
persist should have been cleaned up")` would be clearer.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java:
##########
@@ -200,26 +207,26 @@ public void 
doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
       LOG.warn("Expected realtime split for mor table. Found split: {}", 
split);
       return;
     }
-    if (internalSchemaOption.isPresent()) {
+    if (evolutionSchemaOption.isPresent()) {
       HoodieSchema tableSchema = getSchemaFromCache();
       List<String> requiredColumns = getRequireColumn(job);
-      InternalSchema prunedInternalSchema = 
InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(),
-          requiredColumns);
+      HoodieSchema writerSchema = 
HoodieSchemaInternalSchemaBridge.withRecordName(
+          evolutionSchemaOption.get(), tableSchema.getName());
+      HoodieSchema prunedEvolutionSchema = 
HoodieSchemaInternalSchemaBridge.pruneByLeafNames(

Review Comment:
   πŸ€– Before this change, both `writerSchema` and `readerSchema` were explicitly 
named with `tableSchema.getName()` (via `InternalSchemaConverter.convert(..., 
tableSchema.getName())`). After the migration, the writer goes through 
`withRecordName(..., tableSchema.getName())` but `prunedEvolutionSchema` keeps 
the evolution schema's own `getFullName()` per `pruneByLeafNames`'s contract. 
Could this asymmetry cause problems for Avro reader-writer name matching when 
`evolutionSchemaOption.get().getFullName()` differs from 
`tableSchema.getName()` (e.g. simple-name vs namespace-qualified)? Worth either 
also renaming the pruned reader schema to `tableSchema.getName()`, or 
confirming the downstream `record.toIndexedRecord(getReaderSchema(), ...)` / 
`HoodieAvroUtils.rewriteRecord(..., getReaderSchema().toAvroSchema())` paths 
don't depend on name equality.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to