hudi-agent commented on code in PR #18688:
URL: https://github.com/apache/hudi/pull/18688#discussion_r3185365076


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -166,23 +163,23 @@ private Option<Function<HoodieRecord, HoodieRecord>> 
composeSchemaEvolutionTrans
                                                                                
          HoodieBaseFile baseFile,
                                                                                
          HoodieWriteConfig writeConfig,
                                                                                
          HoodieTableMetaClient metaClient) {
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(writeConfig.getInternalSchema());
+    Option<HoodieSchema> querySchemaOpt = 
HoodieSchemaSerDe.fromJson(writeConfig.getInternalSchema());
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
       // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(),
+      HoodieSchema querySchema = 
HoodieSchemaEvolutionUtils.reconcileSchema(writerSchema,
           querySchemaOpt.get(), 
writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
       long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
-      InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
-      if (fileSchema.isEmptySchema() && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
+      HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.getSchemaByVersionId(commitInstantTime, metaClient);
+      if ((fileSchema == null || fileSchema.isEmptySchema()) && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);

Review Comment:
   🤖 Could `fileSchema` still be null after this branch when 
`RECONCILE_SCHEMA=false`? You added `fileSchema == null ||` to the check (which 
the old `InternalSchemaCache` API never required), but the recovery path is 
gated by `RECONCILE_SCHEMA`. If `getSchemaByVersionId` returns null and 
`RECONCILE_SCHEMA` is false, `writeInternalSchema = fileSchema` stays null and 
`writeInternalSchema.getAllColsFullName()` a few lines below will NPE. Worth 
either tightening the cache contract or hoisting the null fallback out of the 
`RECONCILE_SCHEMA` gate.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java:
##########
@@ -64,14 +62,9 @@ public HoodieAvroParquetReader(InputSplit inputSplit, 
Configuration conf, Option
       MessageType messageType = fileFooter.getFileMetaData().getSchema();
       baseSchema = getAvroSchemaConverter(conf).convert(messageType);
 
-      if (internalSchemaOption.isPresent()) {
+      if (evolutionSchemaOption.isPresent()) {
         // do schema reconciliation in case there exists read column which is 
not in the file schema.
-        InternalSchema mergedInternalSchema = new InternalSchemaMerger(
-            InternalSchemaConverter.convert(baseSchema),
-            internalSchemaOption.get(),
-            true,
-            true).mergeSchema();
-        baseSchema = InternalSchemaConverter.convert(mergedInternalSchema, 
baseSchema.getFullName());
+        baseSchema = new HoodieSchemaMerger(baseSchema, 
evolutionSchemaOption.get(), true, true).mergeSchema();

Review Comment:
   🤖 The previous code finished the merge with 
`InternalSchemaConverter.convert(mergedInternalSchema, 
baseSchema.getFullName())`, which kept the file's record name on the resulting 
schema. With the new `HoodieSchemaMerger.mergeSchema()` 
(HoodieSchemaMerger.java:95) the merged schema is named after 
`querySchema.getFullName()` (i.e. the evolution schema, typically 
`"hoodieSchema"` after a SerDe round-trip) instead of the file schema. Since 
`baseSchema` is then handed to `AvroReadSupport.setAvroReadSchema` and used at 
line 99 for `avroToArrayWritable`, this name change can flow into Avro's 
record-name matching during deserialization. Could you confirm the downstream 
code is name-agnostic, or restore the file schema's name explicitly here (e.g. 
`HoodieSchemaProjections.withRecordName(merged, baseSchema.getFullName())`)?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -370,16 +368,14 @@ class HoodieSparkSqlWriterInternal {
 
       val sourceSchema = 
HoodieSchemaConversionUtils.convertUserStructTypeToHoodieSchema(
         df.schema, avroRecordName, avroRecordNamespace)
-      val internalSchemaOpt = 
HoodieSchemaUtils.getLatestTableInternalSchema(hoodieConfig, 
tableMetaClient).orElse {
+      val evolutionSchemaOpt = 
HoodieSchemaUtils.getLatestTableEvolutionSchema(hoodieConfig, 
tableMetaClient).orElse {
         // In case we need to reconcile the schema and schema evolution is 
enabled,
         // we will force-apply schema evolution to the writer's schema
         if (shouldReconcileSchema && 
hoodieConfig.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED))
 {
           val allowOperationMetaDataField = 
parameters.getOrElse(HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD.key(), 
"false").toBoolean
-          Some(InternalSchemaConverter.convert(
-            HoodieCommonSchemaUtils.addMetadataFields(
-              latestTableSchemaOpt.getOrElse(sourceSchema),
-              allowOperationMetaDataField
-            )
+          Some(HoodieCommonSchemaUtils.addMetadataFields(

Review Comment:
   🤖 Previously this branch wrapped the result with 
`InternalSchemaConverter.convert(addMetadataFields(...))`, which minted fresh 
field ids for every field. The new code returns the raw output of 
`addMetadataFields(...)`, and per `HoodieSchemaUtils.addMetadataFields` (lines 
186-207) the new metadata fields are constructed via 
`HoodieSchemaField.of(...)` without an id. When `latestTableSchemaOpt` / 
`sourceSchema` also has no ids (the common case for a brand-new table where 
this orElse branch fires), the resulting evolution schema serialised into 
`INTERNAL_SCHEMA_STRING` has `field_id == -1` for everything, which would break 
id-keyed schema-on-read (rename/reorder tracking) on subsequent commits. Is 
this loss of fresh id assignment intentional, or should this still be 
id-stamped? Same shape recurs on line 671 in `addSchemaEvolutionParameters`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -72,116 +65,112 @@ case class AlterTableCommand(table: CatalogTable, 
changes: Seq[TableChange], cha
     // convert to delete first then add again
     val deleteChanges = changes.filter(p => 
p.isInstanceOf[DeleteColumn]).map(_.asInstanceOf[DeleteColumn])
     val addChanges = changes.filter(p => 
p.isInstanceOf[AddColumn]).map(_.asInstanceOf[AddColumn])
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyAddAction2Schema(sparkSession, 
applyDeleteAction2Schema(sparkSession, oldSchema, deleteChanges), addChanges)
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column replace finished")
   }
 
-  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, addChanges: Seq[AddColumn]): InternalSchema = {
-    val addChange = TableChanges.ColumnAddChange.get(oldSchema)
+  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, addChanges: Seq[AddColumn]): HoodieSchema = {
+    var cur = oldSchema
     addChanges.foreach { addColumn =>
       val names = addColumn.fieldNames()
       val parentName = AlterTableCommand.getParentName(names)
-      // add col change
-      val colType = 
SparkInternalSchemaConverter.buildTypeFromStructType(addColumn.dataType(), 
true, new AtomicInteger(0))
-      addChange.addColumns(parentName, names.last, colType, 
addColumn.comment())
-      // add position change
-      addColumn.position() match {
+      val fullName = names.mkString(".")
+      val colType = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(addColumn.dataType(),
 names.last)
+      val (positionType, positionRef) = addColumn.position() match {
         case after: TableChange.After =>
-          addChange.addPositionChange(names.mkString("."),
-            if (parentName.isEmpty) after.column() else parentName + "." + 
after.column(), "after")
+          (ColumnPositionType.AFTER, if (parentName.isEmpty) after.column() 
else parentName + "." + after.column())
         case _: TableChange.First =>
-          addChange.addPositionChange(names.mkString("."), "", "first")
+          (ColumnPositionType.FIRST, "")
         case _ =>
+          (ColumnPositionType.NO_OPERATION, "")
       }
+      cur = new HoodieSchemaChangeApplier(cur).applyAddChange(fullName, 
colType, addColumn.comment(), positionRef, positionType)
     }
-    SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange)
+    cur

Review Comment:
   🤖 I think nested field ids may go missing for `ALTER TABLE ADD COLUMNS` of a 
struct/array/map type. `convertStructTypeToHoodieSchema` doesn't stamp 
`field-id` props, and `HoodieSchemaChangeApplier.applyAddChange` only assigns 
an id to the top-level field — the documented contract (see 
`HoodieSchemaEvolutionUtils` line 528) is that callers must follow the add loop 
with `HoodieSchemaIdAssigner.assignFresh(cur)`. The legacy path got nested ids 
via `InternalSchemaBuilder.refreshNewId` inside 
`ColumnAddChange.addColumnsInternal`. Could you add an `assignFresh(cur)` after 
the loop, or confirm I'm missing where nested ids get stamped? Otherwise a 
follow-up `ALTER` on `nested.firstname` will fail `findIdByName`. @yihua
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaMerger.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Read-path schema merger: combines a file's stored schema with a query schema
+ * (and tracks renamed fields) to produce the schema actually used to decode 
rows
+ * from a base file or log block.
+ *
+ * <p>This is the read-path linchpin: {@code HoodieFileGroupReader},
+ * {@code AbstractHoodieLogRecordScanner}, and the parquet readers all converge
+ * on this interface. The algorithm walks the query schema and, for each field 
id,
+ * either keeps the field as-is, projects file-side names/types onto it 
(renames
+ * and type changes), suffix-disambiguates a query-only field that collides 
with
+ * a name still present on the file side, or marks it nullable as an
+ * "added" column.
+ *
+ * <p>Implementation walks {@link HoodieSchema} directly via
+ * {@link HoodieSchema#findField(int)} / {@link 
HoodieSchema#findField(String)} /
+ * {@link HoodieSchema#findFullName(int)} / {@link 
HoodieSchema#findIdByName(String)}.
+ */
+public class HoodieSchemaMerger {
+
+  private final HoodieSchema fileSchema;
+  private final HoodieSchema querySchema;
+  // When the Spark update/merge API flips a column's nullability from optional
+  // to required, the merger should still treat it as optional. Disabled for
+  // strictly-typed callers that want the original required-ness preserved.
+  private final boolean ignoreRequiredAttribute;
+  // For columns whose type changed, prefer the file's type during decode (so
+  // the parquet/avro reader sees the on-disk type) and let downstream record
+  // rewriters apply the promotion. Disabled for log readers that promote at
+  // record-rewrite time directly via reWriteRecordWithNewSchema.
+  private final boolean useColumnTypeFromFileSchema;
+  // For renamed columns, prefer the file's name during decode so the parquet
+  // reader can locate the column. Disabled for log readers (which can read by
+  // the new name and then rewrite).
+  private final boolean useColNameFromFileSchema;
+
+  private final Map<String, String> renamedFields = new HashMap<>();
+
+  public HoodieSchemaMerger(HoodieSchema fileSchema,
+                            HoodieSchema querySchema,
+                            boolean ignoreRequiredAttribute,
+                            boolean useColumnTypeFromFileSchema,
+                            boolean useColNameFromFileSchema) {
+    this.fileSchema = fileSchema;
+    this.querySchema = querySchema;
+    this.ignoreRequiredAttribute = ignoreRequiredAttribute;
+    this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema;
+    this.useColNameFromFileSchema = useColNameFromFileSchema;
+  }
+
+  public HoodieSchemaMerger(HoodieSchema fileSchema,
+                            HoodieSchema querySchema,
+                            boolean ignoreRequiredAttribute,
+                            boolean useColumnTypeFromFileSchema) {
+    this(fileSchema, querySchema, ignoreRequiredAttribute, 
useColumnTypeFromFileSchema, true);
+  }
+
+  /**
+   * Produces the merged read schema. Field ids carry through from the query 
schema;
+   * column names and types follow the {@code useCol*FromFileSchema} flags set 
at
+   * construction time.
+   */
+  public HoodieSchema mergeSchema() {
+    HoodieSchema unwrapped = querySchema.isNullable() ? 
querySchema.getNonNullType() : querySchema;
+    HoodieSchema merged = mergeRecord(unwrapped, querySchema.getFullName());
+    if (querySchema.schemaId() >= 0) {
+      merged.setSchemaId(querySchema.schemaId());
+    }
+    if (querySchema.maxColumnId() >= 0) {
+      merged.setMaxColumnId(querySchema.maxColumnId());
+    }
+    merged.invalidateIdIndex();
+    return merged;
+  }
+
+  /**
+   * Same as {@link #mergeSchema()} but additionally returns the rename map
+   * (query-side full name → file-side leaf name) so downstream record 
rewriters
+   * can project correctly across renames.
+   */
+  public Pair<HoodieSchema, Map<String, String>> mergeSchemaGetRenamed() {
+    HoodieSchema merged = mergeSchema();
+    return Pair.of(merged, renamedFields);
+  }
+
+  public HoodieSchema getFileSchema() {
+    return fileSchema;
+  }
+
+  public HoodieSchema getQuerySchema() {
+    return querySchema;
+  }
+
+  // -------------------------------------------------------------------------
+  // Tree walk over the query schema. Each leaf id is checked against the file
+  // schema; the result is rebuilt structurally.
+  // -------------------------------------------------------------------------
+
+  /**
+   * Recursively merges the type at a given query-side id. The id is the field
+   * id at the parent record (or array element / map value id when descending
+   * into nested types). For primitives, the id picks a candidate type from
+   * the file schema; for compound types, recursion drives the structure.
+   */
+  private HoodieSchema mergeType(HoodieSchema type, int currentTypeId) {
+    HoodieSchema effective = type.isNullable() ? type.getNonNullType() : type;
+    switch (effective.getType()) {
+      case RECORD:
+        return mergeRecord(effective, effective.getFullName());
+      case ARRAY:
+        return mergeArray(effective);
+      case MAP:
+        return mergeMap(effective);
+      default:
+        return mergePrimitive(effective, currentTypeId);
+    }
+  }
+
+  private HoodieSchema mergeRecord(HoodieSchema record, String recordName) {
+    List<HoodieSchemaField> oldFields = record.getFields();
+    List<HoodieSchema> newTypes = new ArrayList<>(oldFields.size());
+    for (HoodieSchemaField queryField : oldFields) {
+      newTypes.add(mergeType(queryField.schema(), queryField.fieldId()));
+    }
+    List<HoodieSchemaField> newFields = buildRecordFields(oldFields, newTypes);
+    return HoodieSchema.createRecord(
+        recordName == null || recordName.isEmpty() ? "hoodieSchema" : 
recordName,
+        null, null, false, newFields);
+  }
+
+  private HoodieSchema mergeArray(HoodieSchema array) {
+    HoodieSchema elementType = array.getElementType();
+    int elementId = 
readIntProp(array.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP));
+    boolean elementOptional = elementType.isNullable();
+    HoodieSchema mergedElement = mergeType(elementType, elementId);
+    HoodieSchema effectiveMergedElement = elementOptional ? 
HoodieSchema.createNullable(mergedElement) : mergedElement;
+    HoodieSchema result = HoodieSchema.createArray(effectiveMergedElement);
+    if (elementId >= 0) {
+      result.getAvroSchema().addProp(HoodieSchema.ELEMENT_ID_PROP, elementId);
+    }
+    return result;
+  }
+
+  private HoodieSchema mergeMap(HoodieSchema map) {
+    HoodieSchema valueType = map.getValueType();
+    int keyId = 
readIntProp(map.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP));
+    int valueId = 
readIntProp(map.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP));
+    boolean valueOptional = valueType.isNullable();
+    HoodieSchema mergedValue = mergeType(valueType, valueId);
+    HoodieSchema effectiveMergedValue = valueOptional ? 
HoodieSchema.createNullable(mergedValue) : mergedValue;
+    HoodieSchema result = HoodieSchema.createMap(effectiveMergedValue);
+    if (keyId >= 0) {
+      result.getAvroSchema().addProp(HoodieSchema.KEY_ID_PROP, keyId);
+    }
+    if (valueId >= 0) {
+      result.getAvroSchema().addProp(HoodieSchema.VALUE_ID_PROP, valueId);
+    }
+    return result;
+  }
+
+  /**
+   * Per-id record-field rebuild. Mirrors the legacy {@code buildRecordType}
+   * algorithm: same id → check name match (rename detection), id missing on
+   * file side → check name collision (suffix disambiguator), id+name both
+   * absent → emit as a new (added) column.
+   */
+  private List<HoodieSchemaField> buildRecordFields(List<HoodieSchemaField> 
oldFields, List<HoodieSchema> newTypes) {
+    List<HoodieSchemaField> newFields = new ArrayList<>(newTypes.size());
+    for (int i = 0; i < newTypes.size(); i++) {
+      HoodieSchemaField oldField = oldFields.get(i);
+      HoodieSchema newType = newTypes.get(i);
+      int fieldId = oldField.fieldId();
+      String fullName = querySchema.findFullName(fieldId);
+      String fileFullName = fileSchema.findFullName(fieldId);
+      boolean fileHasId = !fileFullName.isEmpty();
+      if (fileHasId) {
+        if (fileFullName.equals(fullName)) {
+          // Same name on both sides — only the type may have changed; keep id 
and name.
+          newFields.add(rebuildField(oldField, oldField.name(), newType, 
oldField.schema().isNullable()));
+        } else {
+          newFields.add(dealWithRename(fieldId, newType, oldField, 
fileFullName));
+        }
+      } else {
+        // Field id not present in file. It's either truly new (added column) 
or
+        // a name collision with an old, dropped-then-readded column whose name
+        // already exists in the file with a different id. Disambiguate the
+        // latter by appending "suffix" so the parquet reader doesn't pick up
+        // the old column's bytes for the new column.
+        String normalized = normalizeFullName(fullName);
+        if (fileSchema.findIdByName(normalized) >= 0) {
+          // Use the original sub-schema (oldField.schema()) so the suffixed
+          // synthetic column carries the old shape, not the merged one.
+          newFields.add(rebuildField(oldField, oldField.name() + "suffix", 
oldField.schema().getNonNullType(), oldField.schema().isNullable()));

Review Comment:
   🤖 nit: the literal `"suffix"` appended to field names is a magic string — 
it's referenced in the nearby comment but not extractable as a symbol. Could 
you pull it out as a named constant (e.g. `DISAMBIGUATION_SUFFIX`) so the 
intent is self-documenting and a grep can find all the places it matters?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/test/java/org/apache/hudi/common/schema/evolution/TestHoodieSchemaSerDe.java:
##########
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaIdAssigner;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.TreeMap;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link HoodieSchemaSerDe}: round-trip stability of structure and
+ * field ids across {@code toJson} / {@code fromJson} for primitives, arrays,
+ * maps, and nested records.
+ *
+ * <p>The on-disk format is a backward-compatibility boundary (commit 
metadata's
+ * {@code latest_schema} blob and {@code .hoodie/.schema/} history files), so
+ * the byte-format-compat net needs re-establishing as golden-file tests
+ * against captured pre-migration output. See
+ * {@code project_phase5_serde_rewrite.md} in auto-memory for status.
+ */
+public class TestHoodieSchemaSerDe {

Review Comment:
   🤖 nit: the `project_phase5_serde_rewrite.md` in "auto-memory" reference is 
an internal artifact that external contributors won't be able to find. Could 
you replace this with a JIRA ticket number or just drop the sentence?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaMerger.java:
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Read-path schema merger: combines a file's stored schema with a query schema
+ * (and tracks renamed fields) to produce the schema actually used to decode 
rows
+ * from a base file or log block.
+ *
+ * <p>This is the read-path linchpin: {@code HoodieFileGroupReader},
+ * {@code AbstractHoodieLogRecordScanner}, and the parquet readers all converge
+ * on this interface. The algorithm walks the query schema and, for each field 
id,
+ * either keeps the field as-is, projects file-side names/types onto it 
(renames
+ * and type changes), suffix-disambiguates a query-only field that collides 
with
+ * a name still present on the file side, or marks it nullable as an
+ * "added" column.
+ *
+ * <p>Implementation walks {@link HoodieSchema} directly via
+ * {@link HoodieSchema#findField(int)} / {@link 
HoodieSchema#findField(String)} /
+ * {@link HoodieSchema#findFullName(int)} / {@link 
HoodieSchema#findIdByName(String)}.
+ */
+public class HoodieSchemaMerger {
+
+  private final HoodieSchema fileSchema;
+  private final HoodieSchema querySchema;
+  // When the Spark update/merge API flips a column's nullability from optional
+  // to required, the merger should still treat it as optional. Disabled for
+  // strictly-typed callers that want the original required-ness preserved.
+  private final boolean ignoreRequiredAttribute;
+  // For columns whose type changed, prefer the file's type during decode (so
+  // the parquet/avro reader sees the on-disk type) and let downstream record
+  // rewriters apply the promotion. Disabled for log readers that promote at
+  // record-rewrite time directly via reWriteRecordWithNewSchema.
+  private final boolean useColumnTypeFromFileSchema;
+  // For renamed columns, prefer the file's name during decode so the parquet
+  // reader can locate the column. Disabled for log readers (which can read by
+  // the new name and then rewrite).
+  private final boolean useColNameFromFileSchema;
+
+  private final Map<String, String> renamedFields = new HashMap<>();
+
+  public HoodieSchemaMerger(HoodieSchema fileSchema,
+                            HoodieSchema querySchema,
+                            boolean ignoreRequiredAttribute,
+                            boolean useColumnTypeFromFileSchema,
+                            boolean useColNameFromFileSchema) {
+    this.fileSchema = fileSchema;
+    this.querySchema = querySchema;
+    this.ignoreRequiredAttribute = ignoreRequiredAttribute;
+    this.useColumnTypeFromFileSchema = useColumnTypeFromFileSchema;
+    this.useColNameFromFileSchema = useColNameFromFileSchema;
+  }
+
+  public HoodieSchemaMerger(HoodieSchema fileSchema,
+                            HoodieSchema querySchema,
+                            boolean ignoreRequiredAttribute,
+                            boolean useColumnTypeFromFileSchema) {
+    this(fileSchema, querySchema, ignoreRequiredAttribute, 
useColumnTypeFromFileSchema, true);
+  }
+
+  /**
+   * Produces the merged read schema. Field ids carry through from the query 
schema;
+   * column names and types follow the {@code useCol*FromFileSchema} flags set 
at
+   * construction time.
+   */
+  public HoodieSchema mergeSchema() {
+    HoodieSchema unwrapped = querySchema.isNullable() ? 
querySchema.getNonNullType() : querySchema;
+    HoodieSchema merged = mergeRecord(unwrapped, querySchema.getFullName());
+    if (querySchema.schemaId() >= 0) {
+      merged.setSchemaId(querySchema.schemaId());
+    }
+    if (querySchema.maxColumnId() >= 0) {
+      merged.setMaxColumnId(querySchema.maxColumnId());
+    }
+    merged.invalidateIdIndex();
+    return merged;
+  }
+
+  /**
+   * Same as {@link #mergeSchema()} but additionally returns the rename map
+   * (query-side full name → file-side leaf name) so downstream record 
rewriters
+   * can project correctly across renames.
+   */
+  public Pair<HoodieSchema, Map<String, String>> mergeSchemaGetRenamed() {
+    HoodieSchema merged = mergeSchema();
+    return Pair.of(merged, renamedFields);
+  }
+
+  public HoodieSchema getFileSchema() {
+    return fileSchema;
+  }
+
+  public HoodieSchema getQuerySchema() {
+    return querySchema;
+  }
+
+  // -------------------------------------------------------------------------
+  // Tree walk over the query schema. Each leaf id is checked against the file
+  // schema; the result is rebuilt structurally.
+  // -------------------------------------------------------------------------
+
+  /**
+   * Recursively merges the type at a given query-side id. The id is the field
+   * id at the parent record (or array element / map value id when descending
+   * into nested types). For primitives, the id picks a candidate type from
+   * the file schema; for compound types, recursion drives the structure.
+   */
+  private HoodieSchema mergeType(HoodieSchema type, int currentTypeId) {
+    HoodieSchema effective = type.isNullable() ? type.getNonNullType() : type;
+    switch (effective.getType()) {
+      case RECORD:
+        return mergeRecord(effective, effective.getFullName());
+      case ARRAY:
+        return mergeArray(effective);
+      case MAP:
+        return mergeMap(effective);
+      default:
+        return mergePrimitive(effective, currentTypeId);
+    }
+  }
+
+  private HoodieSchema mergeRecord(HoodieSchema record, String recordName) {
+    List<HoodieSchemaField> oldFields = record.getFields();
+    List<HoodieSchema> newTypes = new ArrayList<>(oldFields.size());
+    for (HoodieSchemaField queryField : oldFields) {
+      newTypes.add(mergeType(queryField.schema(), queryField.fieldId()));
+    }
+    List<HoodieSchemaField> newFields = buildRecordFields(oldFields, newTypes);
+    return HoodieSchema.createRecord(
+        recordName == null || recordName.isEmpty() ? "hoodieSchema" : 
recordName,
+        null, null, false, newFields);
+  }
+
+  private HoodieSchema mergeArray(HoodieSchema array) {
+    HoodieSchema elementType = array.getElementType();
+    int elementId = 
readIntProp(array.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP));
+    boolean elementOptional = elementType.isNullable();
+    HoodieSchema mergedElement = mergeType(elementType, elementId);
+    HoodieSchema effectiveMergedElement = elementOptional ? 
HoodieSchema.createNullable(mergedElement) : mergedElement;
+    HoodieSchema result = HoodieSchema.createArray(effectiveMergedElement);
+    if (elementId >= 0) {
+      result.getAvroSchema().addProp(HoodieSchema.ELEMENT_ID_PROP, elementId);
+    }
+    return result;
+  }
+
+  private HoodieSchema mergeMap(HoodieSchema map) {
+    HoodieSchema valueType = map.getValueType();
+    int keyId = 
readIntProp(map.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP));
+    int valueId = 
readIntProp(map.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP));
+    boolean valueOptional = valueType.isNullable();
+    HoodieSchema mergedValue = mergeType(valueType, valueId);
+    HoodieSchema effectiveMergedValue = valueOptional ? 
HoodieSchema.createNullable(mergedValue) : mergedValue;
+    HoodieSchema result = HoodieSchema.createMap(effectiveMergedValue);
+    if (keyId >= 0) {
+      result.getAvroSchema().addProp(HoodieSchema.KEY_ID_PROP, keyId);
+    }
+    if (valueId >= 0) {
+      result.getAvroSchema().addProp(HoodieSchema.VALUE_ID_PROP, valueId);
+    }
+    return result;
+  }
+
+  /**
+   * Per-id record-field rebuild. Mirrors the legacy {@code buildRecordType}
+   * algorithm: same id → check name match (rename detection), id missing on
+   * file side → check name collision (suffix disambiguator), id+name both
+   * absent → emit as a new (added) column.
+   */
+  private List<HoodieSchemaField> buildRecordFields(List<HoodieSchemaField> 
oldFields, List<HoodieSchema> newTypes) {
+    List<HoodieSchemaField> newFields = new ArrayList<>(newTypes.size());
+    for (int i = 0; i < newTypes.size(); i++) {
+      HoodieSchemaField oldField = oldFields.get(i);
+      HoodieSchema newType = newTypes.get(i);
+      int fieldId = oldField.fieldId();
+      String fullName = querySchema.findFullName(fieldId);
+      String fileFullName = fileSchema.findFullName(fieldId);
+      boolean fileHasId = !fileFullName.isEmpty();
+      if (fileHasId) {
+        if (fileFullName.equals(fullName)) {
+          // Same name on both sides — only the type may have changed; keep id 
and name.
+          newFields.add(rebuildField(oldField, oldField.name(), newType, 
oldField.schema().isNullable()));
+        } else {
+          newFields.add(dealWithRename(fieldId, newType, oldField, 
fileFullName));
+        }
+      } else {
+        // Field id not present in file. It's either truly new (added column) 
or
+        // a name collision with an old, dropped-then-readded column whose name
+        // already exists in the file with a different id. Disambiguate the
+        // latter by appending "suffix" so the parquet reader doesn't pick up
+        // the old column's bytes for the new column.
+        String normalized = normalizeFullName(fullName);
+        if (fileSchema.findIdByName(normalized) >= 0) {
+          // Use the original sub-schema (oldField.schema()) so the suffixed
+          // synthetic column carries the old shape, not the merged one.
+          newFields.add(rebuildField(oldField, oldField.name() + "suffix", 
oldField.schema().getNonNullType(), oldField.schema().isNullable()));
+        } else {
+          // New column. Honor the optional override that papers over the
+          // Spark update/merge nullability flip when ignoreRequiredAttribute 
is set.
+          boolean optional = ignoreRequiredAttribute || 
oldField.schema().isNullable();
+          newFields.add(rebuildField(oldField, oldField.name(), newType, 
optional));
+        }
+      }
+    }
+    return newFields;
+  }
+
+  private HoodieSchemaField dealWithRename(int fieldId, HoodieSchema newType, 
HoodieSchemaField oldField, String fileFullName) {
+    String nameFromFileSchema = leafOf(fileFullName);
+    String nameFromQuerySchema = oldField.name();
+    String finalFieldName = useColNameFromFileSchema ? nameFromFileSchema : 
nameFromQuerySchema;
+    // findType(int) returns the type at the id (may itself be a [null,T] union
+    // for nullable record fields). Unwrap to mirror the legacy semantics that
+    // pulled Types.Field.type() directly.
+    HoodieSchema typeFromFileSchema = fileSchema.findType(fieldId);
+    HoodieSchema typeFromFileNonNull = typeFromFileSchema == null
+        ? null
+        : (typeFromFileSchema.isNullable() ? 
typeFromFileSchema.getNonNullType() : typeFromFileSchema);
+    if (!useColNameFromFileSchema) {
+      // Use the full name as the rename key to disambiguate composite-rename
+      // scenarios. Example: original schema ROW<f_row: ROW<f1 STRING, f2 
BIGINT>, f3 STRING>;
+      // rename f_row.f1 -> f_row.f_str AND f3 -> f_str. Keying renamedFields 
by
+      // leaf name alone would have the second rename overwrite the first.
+      renamedFields.put(querySchema.findFullName(fieldId), nameFromFileSchema);
+    }
+    boolean optional = oldField.schema().isNullable();
+    // Nested-type changes aren't allowed in current schema-evolution rules, so
+    // if the new type is nested we know it's structurally identical to the 
file
+    // and we just project the merged sub-schema onto the renamed field name.
+    if (isNested(newType)) {
+      return rebuildField(oldField, finalFieldName, newType, optional);
+    }
+    HoodieSchema chosen = (useColumnTypeFromFileSchema && typeFromFileNonNull 
!= null) ? typeFromFileNonNull : newType;
+    return rebuildField(oldField, finalFieldName, chosen, optional);
+  }
+
+  /** Returns the last dot-separated component of a full name. */
+  private static String leafOf(String fullName) {
+    int dot = fullName.lastIndexOf('.');
+    return dot < 0 ? fullName : fullName.substring(dot + 1);
+  }
+
+  /**
+   * Walks each prefix of {@code fullName} and substitutes any parent name that
+   * was renamed on the file side, so a query-side full name like
+   * {@code aa.d} (where {@code aa} used to be {@code a}) resolves to
+   * {@code a.d} for the file-side lookup. Without this, dropped-and-readded
+   * leaves under a renamed parent wouldn't trigger the suffix disambiguator.
+   */
+  private String normalizeFullName(String fullName) {
+    String[] nameParts = fullName.split("\\.");
+    String[] normalizedNameParts = new String[nameParts.length];
+    System.arraycopy(nameParts, 0, normalizedNameParts, 0, nameParts.length);
+    for (int j = 0; j < nameParts.length - 1; j++) {
+      StringBuilder sb = new StringBuilder();
+      for (int k = 0; k <= j; k++) {
+        sb.append(nameParts[k]);
+      }
+      String parentName = sb.toString();
+      int parentFieldIdFromQuerySchema = querySchema.findIdByName(parentName);
+      String parentNameFromFileSchema = 
fileSchema.findFullName(parentFieldIdFromQuerySchema);
+      if (parentNameFromFileSchema.isEmpty()) {
+        break;
+      }
+      if (!parentNameFromFileSchema.equalsIgnoreCase(parentName)) {
+        String[] parentNameParts = parentNameFromFileSchema.split("\\.");
+        System.arraycopy(parentNameParts, 0, normalizedNameParts, 0, 
parentNameParts.length);
+      }
+    }
+    return StringUtils.join(normalizedNameParts, ".");
+  }
+
+  /**
+   * Picks the per-cell primitive type. When the file schema has a value at
+   * this id, prefer it (so the parquet reader sees the on-disk type) unless
+   * the caller explicitly disabled that via {@code 
useColumnTypeFromFileSchema}.
+   */
+  private HoodieSchema mergePrimitive(HoodieSchema typeFromQuerySchema, int 
currentPrimitiveTypeId) {
+    HoodieSchema typeFromFileSchema = 
fileSchema.findType(currentPrimitiveTypeId);
+    if (typeFromFileSchema == null) {
+      return typeFromQuerySchema;
+    }
+    HoodieSchema fileEffective = typeFromFileSchema.isNullable() ? 
typeFromFileSchema.getNonNullType() : typeFromFileSchema;
+    return useColumnTypeFromFileSchema ? fileEffective : typeFromQuerySchema;
+  }
+
+  /**
+   * Builds a new HoodieSchemaField at the same id as {@code oldField} but with
+   * the supplied name and (possibly nullability-wrapped) sub-schema. The
+   * field-id Avro property is preserved.
+   */
+  private static HoodieSchemaField rebuildField(HoodieSchemaField oldField,
+                                                String name,
+                                                HoodieSchema schema,
+                                                boolean optional) {
+    HoodieSchema effective = schema.isNullable() ? schema.getNonNullType() : 
schema;
+    HoodieSchema finalSchema = optional ? 
HoodieSchema.createNullable(effective) : effective;
+    HoodieSchemaField rebuilt = HoodieSchemaField.of(name, finalSchema, 
oldField.doc().orElse(null), oldField.defaultVal().orElse(null));
+    if (oldField.fieldId() >= 0) {
+      rebuilt.addProp(HoodieSchema.FIELD_ID_PROP, oldField.fieldId());

Review Comment:
   🤖 I noticed this `rebuildField` is inconsistent with 
`HoodieSchemaChangeApplier.rebuildField`, which inserts 
`HoodieSchema.NULL_VALUE` when the source has no default and the rebuilt schema 
is nullable. Here we pass `oldField.defaultVal().orElse(null)` directly. For a 
nullable "added" column whose oldField (from the parsed evolution schema) has 
no default, the result will be a nullable Avro field with no `default: null`, 
which Avro reader resolution can't fill in. Could you confirm whether the 
merger output is expected to have explicit nullable defaults?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java:
##########
@@ -139,7 +139,9 @@ public static HoodieFileGroupReader<RowData> 
createFileGroupReader(
         .withFileSlice(fileSlice)
         .withDataSchema(tableSchema)
         .withRequestedSchema(requiredSchema)
-        
.withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema()))
+        .withEvolutionSchema(schemaEvolutionManager.getQuerySchema() == null

Review Comment:
   🤖 nit: the old code already used 
`Option.ofNullable(internalSchemaManager.getQuerySchema())` — could you restore 
that pattern here? The ternary calls `getQuerySchema()` twice and is three 
lines where one suffices: 
`.withEvolutionSchema(Option.ofNullable(schemaEvolutionManager.getQuerySchema()))`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaSerDe.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.JsonUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Pure-{@link HoodieSchema} JSON serializer/deserializer for evolution 
schemas.
+ *
+ * <p>The on-disk format is a hard backward-compatibility boundary: the
+ * {@code latest_schema} blob in commit metadata and the {@code 
.hoodie/.schema/}
+ * history files have a fixed JSON layout that predates this class, and old
+ * tables must remain readable. This class emits that JSON layout byte-for-byte
+ * and parses every shape the original could parse, including:
+ *
+ * <ul>
+ *   <li>Top-level record with {@code max_column_id} / {@code version_id} /
+ *       {@code type:"record"} / {@code fields:[...]}.</li>
+ *   <li>Inline records (only {@code type:"record"} + {@code fields:[...]} —
+ *       no version metadata).</li>
+ *   <li>Arrays with {@code element_id}, {@code element}, {@code 
element_optional}.</li>
+ *   <li>Maps with {@code key_id}, {@code key}, {@code value_id}, {@code 
value},
+ *       {@code value_optional}.</li>
+ *   <li>Primitives serialized as a string token: {@code "int"}, {@code 
"long"},
+ *       {@code "boolean"}, {@code "string"}, {@code "binary"}, {@code "date"},
+ *       {@code "uuid"}, {@code "time"} (micros), {@code "time-millis"},
+ *       {@code "timestamp"} (micros), {@code "timestamp-millis"},
+ *       {@code "local-timestamp-millis"}, {@code "local-timestamp-micros"},
+ *       {@code "fixed[N]"}, {@code "decimal_bytes(P,S)"}, {@code 
"decimal_fixed(P,S)[N]"},
+ *       and the legacy {@code "decimal(P, S)"} form (parsed only — emission
+ *       always picks one of the two explicit decimal forms).</li>
+ * </ul>
+ *
+ * <p>Field ids are emitted as the {@code id} key on each field object and
+ * stamped on parse onto the resulting HoodieSchemaField as the
+ * {@code field-id} Avro custom property; element / key / value ids likewise
+ * land as {@code element-id} / {@code key-id} / {@code value-id} on the
+ * corresponding nested HoodieSchema's underlying Avro schema.
+ */
+public final class HoodieSchemaSerDe {
+
+  // Wire format keys reused by callers reading commit metadata. Any change 
here
+  // breaks compatibility with on-disk tables; do not adjust.
+  public static final String LATEST_SCHEMA = "latest_schema";
+  public static final String SCHEMAS = "schemas";
+
+  private static final String MAX_COLUMN_ID = "max_column_id";
+  private static final String VERSION_ID = "version_id";
+  private static final String TYPE = "type";
+  private static final String RECORD = "record";
+  private static final String ARRAY = "array";
+  private static final String MAP = "map";
+  private static final String FIELDS = "fields";
+  private static final String ELEMENT = "element";
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+  private static final String DOC = "doc";
+  private static final String NAME = "name";
+  private static final String ID = "id";
+  private static final String ELEMENT_ID = "element_id";
+  private static final String KEY_ID = "key_id";
+  private static final String VALUE_ID = "value_id";
+  private static final String OPTIONAL = "optional";
+  private static final String ELEMENT_OPTIONAL = "element_optional";
+  private static final String VALUE_OPTIONAL = "value_optional";
+
+  // Match patterns are checked in the order: DECIMAL_FIXED first (otherwise 
its
+  // suffix fixed[N] would partially match FIXED), then FIXED, then 
DECIMAL_BYTES,
+  // and finally the legacy plain DECIMAL form. The DECIMAL pattern requires
+  // whitespace after the comma — that matches what 
Types.DecimalType.toString()
+  // historically emitted ("decimal(p, s)" with a space).
+  private static final Pattern FIXED_PATTERN = 
Pattern.compile("fixed\\[(\\d+)\\]");
+  private static final Pattern DECIMAL_PATTERN = 
Pattern.compile("decimal\\((\\d+),\\s+(\\d+)\\)");
+  private static final Pattern DECIMAL_BYTES_PATTERN = 
Pattern.compile("decimal_bytes\\((\\d+),\\s*(\\d+)\\)");
+  private static final Pattern DECIMAL_FIXED_PATTERN = 
Pattern.compile("decimal_fixed\\((\\d+),\\s*(\\d+)\\)\\[(\\d+)\\]");
+
+  private HoodieSchemaSerDe() {
+  }
+
+  // -------------------------------------------------------------------------
+  // Public API
+  // -------------------------------------------------------------------------
+
+  /**
+   * Serializes a single schema to the {@code latest_schema}-style JSON blob.
+   * Returns an empty string for null or empty-sentinel schemas — matching the
+   * historical semantics for the {@code latest_schema} commit metadata key.
+   */
+  public static String toJson(HoodieSchema schema) {
+    if (schema == null || schema.isEmptySchema()) {
+      return "";
+    }
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator gen = new JsonFactory().createGenerator(writer);
+      writeRecordTopLevel(schema, gen);
+      gen.flush();
+      return writer.toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Serializes a list of versioned schemas to the {@code .hoodie/.schema/}
+   * history blob format: a single object whose {@code schemas} array contains
+   * each schema as a top-level record (carrying its own {@code version_id}
+   * and {@code max_column_id}).
+   */
+  public static String toJsonHistory(List<HoodieSchema> schemas) {
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator gen = new JsonFactory().createGenerator(writer);
+      gen.writeStartObject();
+      gen.writeArrayFieldStart(SCHEMAS);
+      for (HoodieSchema s : schemas) {
+        writeRecordTopLevel(s, gen);
+      }
+      gen.writeEndArray();
+      gen.writeEndObject();
+      gen.flush();
+      return writer.toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parses a single-schema JSON blob (typically the {@code latest_schema}
+   * value from commit metadata). Returns empty for null or empty input —
+   * the history-blob entry point is {@link #parseHistorySchemas(String)}.
+   */
+  public static Option<HoodieSchema> fromJson(String json) {
+    return fromJson(json, "hoodieSchema");
+  }
+
+  /**
+   * Variant of {@link #fromJson(String)} that fixes the record name on the
+   * resulting top-level HoodieSchema. Useful for callers that need the
+   * record name to match an existing schema namespace.
+   */
+  public static Option<HoodieSchema> fromJson(String json, String recordName) {
+    if (json == null || json.isEmpty()) {
+      return Option.empty();
+    }
+    try {
+      JsonNode node = JsonUtils.getObjectMapper().readTree(json);
+      return Option.of(parseRecordTopLevel(node, recordName));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parses a history blob and returns its schemas keyed by {@code version_id}.
+   * The TreeMap ordering lets callers iterate from oldest to newest.
+   */
+  public static TreeMap<Long, HoodieSchema> parseHistorySchemas(String json) {
+    TreeMap<Long, HoodieSchema> result = new TreeMap<>();
+    try {
+      JsonNode node = JsonUtils.getObjectMapper().readTree(json);
+      if (!node.has(SCHEMAS)) {
+        throw new IllegalArgumentException(
+            String.format("cannot parser schemas from current json string, 
missing key name: %s", SCHEMAS));
+      }
+      Iterator<JsonNode> iter = node.get(SCHEMAS).elements();
+      while (iter.hasNext()) {
+        HoodieSchema s = parseRecordTopLevel(iter.next(), "hoodieSchema");
+        result.put(s.schemaId(), s);
+      }
+    } catch (IOException e) {
+      throw new HoodieException(e);
+    }
+    return result;
+  }
+
+  /**
+   * Appends a freshly-evolved schema to an existing serialized history blob.
+   * Performs string surgery on the {@code "{\"schemas\":["} prefix to avoid
+   * re-serializing the entire history. Returns an empty string when
+   * {@code newSchema} is null, and the new schema as a single-element history
+   * when {@code oldSchemas} is null/empty. Returns an empty string when
+   * {@code oldSchemas} doesn't carry the expected prefix — matching legacy
+   * permissive behavior.
+   */
+  public static String inheritHistory(HoodieSchema newSchema, String 
oldSchemas) {
+    if (newSchema == null) {
+      return "";
+    }
+    if (oldSchemas == null || oldSchemas.isEmpty()) {
+      return toJsonHistory(Collections.singletonList(newSchema));
+    }
+    String checkedString = "{\"schemas\":[";
+    if (!oldSchemas.startsWith(checkedString)) {
+      return "";
+    }
+    String oldSchemasSuffix = oldSchemas.substring(checkedString.length());
+    return checkedString + toJson(newSchema) + "," + oldSchemasSuffix;
+  }
+
+  /**
+   * Resolves the schema-history entry that applies to a given version id —
+   * exact match if present, otherwise the largest entry strictly less than
+   * {@code versionId}, otherwise {@code null}. Returns {@code null} (not an
+   * empty-schema sentinel) on miss so callers can choose their own 
empty-schema
+   * construction.
+   */
+  public static HoodieSchema searchSchema(long versionId, TreeMap<Long, 
HoodieSchema> history) {
+    if (history.containsKey(versionId)) {
+      return history.get(versionId);
+    }
+    SortedMap<Long, HoodieSchema> headMap = history.headMap(versionId);
+    return headMap.isEmpty() ? null : headMap.get(headMap.lastKey());
+  }
+
+  // -------------------------------------------------------------------------
+  // Serialization
+  // -------------------------------------------------------------------------
+
+  private static void writeRecordTopLevel(HoodieSchema schema, JsonGenerator 
gen) throws IOException {
+    HoodieSchema record = schema.isNullable() ? schema.getNonNullType() : 
schema;
+    gen.writeStartObject();
+    int maxColumnId = schema.maxColumnId();
+    if (maxColumnId >= 0) {
+      gen.writeNumberField(MAX_COLUMN_ID, maxColumnId);
+    }
+    long versionId = schema.schemaId();
+    if (versionId >= 0) {
+      gen.writeNumberField(VERSION_ID, versionId);
+    }
+    gen.writeStringField(TYPE, RECORD);
+    gen.writeArrayFieldStart(FIELDS);
+    for (HoodieSchemaField field : record.getFields()) {
+      writeField(field, gen);
+    }
+    gen.writeEndArray();
+    gen.writeEndObject();
+  }
+
+  private static void writeField(HoodieSchemaField field, JsonGenerator gen) 
throws IOException {
+    gen.writeStartObject();
+    gen.writeNumberField(ID, field.fieldId());
+    gen.writeStringField(NAME, field.name());
+    boolean optional = field.schema().isNullable();
+    gen.writeBooleanField(OPTIONAL, optional);
+    gen.writeFieldName(TYPE);
+    HoodieSchema fieldType = optional ? field.schema().getNonNullType() : 
field.schema();
+    writeType(fieldType, gen);
+    String doc = field.doc().orElse(null);
+    if (doc != null) {
+      gen.writeStringField(DOC, doc);
+    }
+    gen.writeEndObject();
+  }
+
+  private static void writeType(HoodieSchema schema, JsonGenerator gen) throws 
IOException {
+    HoodieSchemaType t = schema.getType();
+    switch (t) {
+      case RECORD:
+        gen.writeStartObject();
+        gen.writeStringField(TYPE, RECORD);
+        gen.writeArrayFieldStart(FIELDS);
+        for (HoodieSchemaField f : schema.getFields()) {
+          writeField(f, gen);
+        }
+        gen.writeEndArray();
+        gen.writeEndObject();
+        return;
+      case ARRAY: {
+        gen.writeStartObject();
+        gen.writeStringField(TYPE, ARRAY);
+        int elementId = 
readIntProp(schema.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP));
+        gen.writeNumberField(ELEMENT_ID, elementId);
+        gen.writeFieldName(ELEMENT);
+        HoodieSchema elementType = schema.getElementType();
+        boolean elemOptional = elementType.isNullable();
+        writeType(elemOptional ? elementType.getNonNullType() : elementType, 
gen);
+        gen.writeBooleanField(ELEMENT_OPTIONAL, elemOptional);
+        gen.writeEndObject();
+        return;
+      }
+      case MAP: {
+        gen.writeStartObject();
+        gen.writeStringField(TYPE, MAP);
+        int keyId = 
readIntProp(schema.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP));
+        gen.writeNumberField(KEY_ID, keyId);
+        gen.writeFieldName(KEY);
+        // Map keys in Avro/Hudi are always strings — emit "string" rather than
+        // attempting to walk a synthetic key schema. Matches legacy behavior.
+        gen.writeString("string");
+        int valueId = 
readIntProp(schema.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP));
+        gen.writeNumberField(VALUE_ID, valueId);
+        gen.writeFieldName(VALUE);
+        HoodieSchema valueType = schema.getValueType();
+        boolean valOptional = valueType.isNullable();
+        writeType(valOptional ? valueType.getNonNullType() : valueType, gen);
+        gen.writeBooleanField(VALUE_OPTIONAL, valOptional);
+        gen.writeEndObject();
+        return;
+      }
+      default:
+        gen.writeString(primitiveTypeString(schema));
+    }
+  }
+
+  private static int readIntProp(Object raw) {
+    return raw instanceof Number ? ((Number) raw).intValue() : -1;
+  }
+
+  /**
+   * Maps a primitive HoodieSchema to the legacy on-disk type-string token.
+   * Logical types take precedence (so date/uuid/time/timestamp variants emit
+   * with the Avro logical name) and decimal is split between the bytes-backed
+   * and fixed-backed forms based on the underlying Avro storage type. The
+   * {@code -micros} suffix is stripped for the bare {@code timestamp} and
+   * {@code time} tokens because the legacy
+   * {@code Types.TimestampType.toString()} / {@code Types.TimeType.toString()}
+   * both collapse to those, and any drift here would be visible on disk.
+   */
+  private static String primitiveTypeString(HoodieSchema schema) {
+    Schema avro = schema.toAvroSchema();
+    LogicalType logical = avro.getLogicalType();
+    if (logical != null) {
+      if (logical instanceof LogicalTypes.Decimal) {
+        LogicalTypes.Decimal dec = (LogicalTypes.Decimal) logical;
+        if (avro.getType() == Schema.Type.FIXED) {
+          return String.format("decimal_fixed(%d,%d)[%d]",
+              dec.getPrecision(), dec.getScale(), avro.getFixedSize());
+        }
+        return String.format("decimal_bytes(%d,%d)", dec.getPrecision(), 
dec.getScale());
+      }
+      String name = logical.getName();
+      if ("timestamp-micros".equals(name)) {
+        return "timestamp";
+      }
+      if ("time-micros".equals(name)) {
+        return "time";
+      }
+      return name;
+    }
+    switch (avro.getType()) {
+      case BOOLEAN:
+        return "boolean";
+      case INT:
+        return "int";
+      case LONG:
+        return "long";
+      case FLOAT:
+        return "float";
+      case DOUBLE:
+        return "double";
+      case STRING:
+        return "string";
+      case BYTES:
+        return "binary";
+      case FIXED:
+        return String.format("fixed[%d]", avro.getFixedSize());
+      default:
+        throw new HoodieIOException("cannot serialize primitive schema: " + 
avro);
+    }
+  }
+
+  // -------------------------------------------------------------------------
+  // Parsing
+  // -------------------------------------------------------------------------
+
+  private static HoodieSchema parseRecordTopLevel(JsonNode node, String 
recordName) {
+    long versionId = node.has(VERSION_ID) ? node.get(VERSION_ID).asLong() : -1;
+    int maxColumnId = node.has(MAX_COLUMN_ID) ? 
node.get(MAX_COLUMN_ID).asInt() : -1;
+    HoodieSchema record = parseRecord(node, recordName);
+    if (versionId >= 0) {
+      record.setSchemaId(versionId);
+    }
+    if (maxColumnId >= 0) {
+      record.setMaxColumnId(maxColumnId);
+    }
+    record.invalidateIdIndex();
+    return record;
+  }
+
+  private static HoodieSchema parseRecord(JsonNode node, String recordName) {
+    JsonNode fieldsNode = node.get(FIELDS);
+    List<HoodieSchemaField> fields = new ArrayList<>();
+    Iterator<JsonNode> iter = fieldsNode.elements();
+    while (iter.hasNext()) {
+      JsonNode fieldNode = iter.next();
+      int fieldId = fieldNode.get(ID).asInt();
+      String name = fieldNode.get(NAME).asText();
+      boolean optional = fieldNode.get(OPTIONAL).asBoolean();
+      String doc = fieldNode.has(DOC) ? fieldNode.get(DOC).asText() : null;
+      HoodieSchema typeSchema = parseType(fieldNode.get(TYPE), name);
+      HoodieSchema fieldSchema = optional ? 
HoodieSchema.createNullable(typeSchema) : typeSchema;
+      HoodieSchemaField field = HoodieSchemaField.of(name, fieldSchema, doc, 
null);
+      // The id property must live on the Avro Field — that's what
+      // HoodieSchemaField#fieldId() reads on the way out.

Review Comment:
   🤖 Could you double-check whether nullable parsed fields need 
`HoodieSchema.NULL_VALUE` here instead of plain `null`? The legacy 
`InternalSchemaConverter.convert` explicitly stamped `f.isOptional() ? 
NULL_VALUE : null`, which translates to Avro `default: null`. Passing Java 
`null` results in no default on the Avro field, so when this parsed schema is 
later used as a reader schema and a nullable field is missing from the writer's 
data, Avro resolution should fail with an `AvroTypeException`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -166,23 +163,23 @@ private Option<Function<HoodieRecord, HoodieRecord>> 
composeSchemaEvolutionTrans
                                                                                
          HoodieBaseFile baseFile,
                                                                                
          HoodieWriteConfig writeConfig,
                                                                                
          HoodieTableMetaClient metaClient) {
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(writeConfig.getInternalSchema());
+    Option<HoodieSchema> querySchemaOpt = 
HoodieSchemaSerDe.fromJson(writeConfig.getInternalSchema());
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
       // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(),
+      HoodieSchema querySchema = 
HoodieSchemaEvolutionUtils.reconcileSchema(writerSchema,
           querySchemaOpt.get(), 
writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
       long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
-      InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
-      if (fileSchema.isEmptySchema() && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
+      HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.getSchemaByVersionId(commitInstantTime, metaClient);
+      if ((fileSchema == null || fileSchema.isEmptySchema()) && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
         try {
-          fileSchema = 
InternalSchemaConverter.convert(tableSchemaResolver.getTableSchema(true));
+          fileSchema = tableSchemaResolver.getTableSchema(true);
         } catch (Exception e) {
-          throw new HoodieException(String.format("Failed to get 
InternalSchema for given versionId: %s", commitInstantTime), e);
+          throw new HoodieException(String.format("Failed to get evolution 
schema for given versionId: %s", commitInstantTime), e);
         }
       }
-      final InternalSchema writeInternalSchema = fileSchema;
+      final HoodieSchema writeInternalSchema = fileSchema;
       List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();

Review Comment:
   🤖 nit: this local is still named `writeInternalSchema` while everything else 
in the same method was renamed to `fileSchema` / `querySchema` / 
`evolutionSchemaOption` — could you rename it to something like 
`fileEvolutionSchema`? It would also let you collapse the `writeSchemaFromFile 
= writeInternalSchema` alias a few lines below that currently reads as a no-op.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java:
##########
@@ -222,12 +223,16 @@ void 
testGetTableInternalSchemaFromCommitMetadataFindsLatestSchemaUpdateInstant(
 
     HoodieCommitMetadata insertMetadata = new HoodieCommitMetadata();
     
insertMetadata.setOperationType(org.apache.hudi.common.model.WriteOperationType.INSERT);
-    // Create a valid InternalSchema
-    org.apache.hudi.internal.schema.InternalSchema internalSchema = new 
org.apache.hudi.internal.schema.InternalSchema(
-        org.apache.hudi.internal.schema.Types.RecordType.get(
-            org.apache.hudi.internal.schema.Types.Field.get(0, false, "id", 
org.apache.hudi.internal.schema.Types.IntType.get())));
-    
insertMetadata.addMetadata(org.apache.hudi.internal.schema.utils.SerDeHelper.LATEST_SCHEMA,
-        
org.apache.hudi.internal.schema.utils.SerDeHelper.toJson(internalSchema));
+    // Build a non-empty HoodieSchema fixture and serialize via 
HoodieSchemaSerDe.

Review Comment:
   🤖 nit: the same four-line comment block and fixture setup is copy-pasted 
verbatim at line ~313. Could you extract a small helper like 
`buildInsertMetadataWithEvolutionSchema()` to avoid the duplication?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -791,20 +796,21 @@ object HoodieBaseRelation extends SparkAdapterSupport {
   }
 
   /**
-   * Projects provided schema by picking only required (projected) top-level 
columns from it
+   * Projects provided schema by picking only required (projected) top-level 
columns from it.
    *
-   * @param tableSchema schema to project (either of [[InternalSchema]] or 
Avro's [[Schema]])
+   * @param tableSchema     Right is the schema-on-read evolution schema (with 
field ids);
+   *                        Left is the structural HoodieSchema fallback.
    * @param requiredColumns required top-level columns to be projected

Review Comment:
   🤖 nit: now that both type parameters are `HoodieSchema`, 
`Either[HoodieSchema, HoodieSchema]` gives no compile-time hint about which 
side is the evolution schema vs the structural fallback — a reader of the call 
site `Either.cond(evolutionSchemaOpt.isDefined, evolutionSchemaOpt.get, 
sourceSchema)` has to consult the Javadoc to know which branch is which. Could 
you consider a small type alias (`type EvolutionSchema = HoodieSchema`) or a 
tiny wrapper so the distinction is visible without jumping to the docs?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/ColumnChangeID.java:
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import java.util.Locale;
+
+/**
+ * The kind of column-level schema change a DDL operation represents.
+ */
+public enum ColumnChangeID {
+  ADD, UPDATE, DELETE, PROPERTY_CHANGE, REPLACE;
+
+  public static ColumnChangeID fromValue(String value) {
+    switch (value.toLowerCase(Locale.ROOT)) {
+      case "add":
+        return ADD;
+      case "change":
+        return UPDATE;

Review Comment:
   🤖 nit: the wire value `"change"` mapping to the enum constant `UPDATE` is 
non-obvious — could you add a short inline comment like `// legacy wire value` 
so readers don't have to wonder whether this asymmetry is intentional?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -73,59 +70,48 @@ object HoodieSchemaUtils {
   }
 
   /**
-   * get latest internalSchema from table
-   *
-   * @param config          instance of {@link HoodieConfig}
-   * @param tableMetaClient instance of HoodieTableMetaClient
-   * @return Option of InternalSchema. Will always be empty if schema on read 
is disabled
+   * Returns the schema-on-read evolution [[HoodieSchema]] for the table — 
carrying
+   * field ids and version metadata — or [[None]] when schema-on-read is 
disabled,
+   * no schema is in commit metadata, or any read error occurs (read failures 
are
+   * swallowed rather than propagated).
    */
-  def getLatestTableInternalSchema(config: HoodieConfig,
-                                   tableMetaClient: HoodieTableMetaClient): 
Option[InternalSchema] = {
-    getLatestTableInternalSchema(config.getProps, tableMetaClient)
+  def getLatestTableEvolutionSchema(config: HoodieConfig,
+                                    tableMetaClient: HoodieTableMetaClient): 
Option[HoodieSchema] = {
+    getLatestTableEvolutionSchema(config.getProps, tableMetaClient)
   }

Review Comment:
   🤖 nit: the `Properties` overload lost its Javadoc in this refactoring — the 
important caveat that read failures are swallowed rather than propagated is now 
only on the `HoodieConfig` delegating overload above, so callers who go 
directly to the `Properties` version won't see it. Worth adding at least a 
one-liner or a `@see` reference.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaSerDe.java:
##########
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.util.JsonUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Pure-{@link HoodieSchema} JSON serializer/deserializer for evolution 
schemas.
+ *
+ * <p>The on-disk format is a hard backward-compatibility boundary: the
+ * {@code latest_schema} blob in commit metadata and the {@code 
.hoodie/.schema/}
+ * history files have a fixed JSON layout that predates this class, and old
+ * tables must remain readable. This class emits that JSON layout byte-for-byte
+ * and parses every shape the original could parse, including:
+ *
+ * <ul>
+ *   <li>Top-level record with {@code max_column_id} / {@code version_id} /
+ *       {@code type:"record"} / {@code fields:[...]}.</li>
+ *   <li>Inline records (only {@code type:"record"} + {@code fields:[...]} —
+ *       no version metadata).</li>
+ *   <li>Arrays with {@code element_id}, {@code element}, {@code 
element_optional}.</li>
+ *   <li>Maps with {@code key_id}, {@code key}, {@code value_id}, {@code 
value},
+ *       {@code value_optional}.</li>
+ *   <li>Primitives serialized as a string token: {@code "int"}, {@code 
"long"},
+ *       {@code "boolean"}, {@code "string"}, {@code "binary"}, {@code "date"},
+ *       {@code "uuid"}, {@code "time"} (micros), {@code "time-millis"},
+ *       {@code "timestamp"} (micros), {@code "timestamp-millis"},
+ *       {@code "local-timestamp-millis"}, {@code "local-timestamp-micros"},
+ *       {@code "fixed[N]"}, {@code "decimal_bytes(P,S)"}, {@code 
"decimal_fixed(P,S)[N]"},
+ *       and the legacy {@code "decimal(P, S)"} form (parsed only — emission
+ *       always picks one of the two explicit decimal forms).</li>
+ * </ul>
+ *
+ * <p>Field ids are emitted as the {@code id} key on each field object and
+ * stamped on parse onto the resulting HoodieSchemaField as the
+ * {@code field-id} Avro custom property; element / key / value ids likewise
+ * land as {@code element-id} / {@code key-id} / {@code value-id} on the
+ * corresponding nested HoodieSchema's underlying Avro schema.
+ */
+public final class HoodieSchemaSerDe {
+
+  // Wire format keys reused by callers reading commit metadata. Any change 
here
+  // breaks compatibility with on-disk tables; do not adjust.
+  public static final String LATEST_SCHEMA = "latest_schema";
+  public static final String SCHEMAS = "schemas";
+
+  private static final String MAX_COLUMN_ID = "max_column_id";
+  private static final String VERSION_ID = "version_id";
+  private static final String TYPE = "type";
+  private static final String RECORD = "record";
+  private static final String ARRAY = "array";
+  private static final String MAP = "map";
+  private static final String FIELDS = "fields";
+  private static final String ELEMENT = "element";
+  private static final String KEY = "key";
+  private static final String VALUE = "value";
+  private static final String DOC = "doc";
+  private static final String NAME = "name";
+  private static final String ID = "id";
+  private static final String ELEMENT_ID = "element_id";
+  private static final String KEY_ID = "key_id";
+  private static final String VALUE_ID = "value_id";
+  private static final String OPTIONAL = "optional";
+  private static final String ELEMENT_OPTIONAL = "element_optional";
+  private static final String VALUE_OPTIONAL = "value_optional";
+
+  // Match patterns are checked in the order: DECIMAL_FIXED first (otherwise 
its
+  // suffix fixed[N] would partially match FIXED), then FIXED, then 
DECIMAL_BYTES,
+  // and finally the legacy plain DECIMAL form. The DECIMAL pattern requires
+  // whitespace after the comma — that matches what 
Types.DecimalType.toString()
+  // historically emitted ("decimal(p, s)" with a space).
+  private static final Pattern FIXED_PATTERN = 
Pattern.compile("fixed\\[(\\d+)\\]");
+  private static final Pattern DECIMAL_PATTERN = 
Pattern.compile("decimal\\((\\d+),\\s+(\\d+)\\)");
+  private static final Pattern DECIMAL_BYTES_PATTERN = 
Pattern.compile("decimal_bytes\\((\\d+),\\s*(\\d+)\\)");
+  private static final Pattern DECIMAL_FIXED_PATTERN = 
Pattern.compile("decimal_fixed\\((\\d+),\\s*(\\d+)\\)\\[(\\d+)\\]");
+
+  private HoodieSchemaSerDe() {
+  }
+
+  // -------------------------------------------------------------------------
+  // Public API
+  // -------------------------------------------------------------------------
+
+  /**
+   * Serializes a single schema to the {@code latest_schema}-style JSON blob.
+   * Returns an empty string for null or empty-sentinel schemas — matching the
+   * historical semantics for the {@code latest_schema} commit metadata key.
+   */
+  public static String toJson(HoodieSchema schema) {
+    if (schema == null || schema.isEmptySchema()) {
+      return "";
+    }
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator gen = new JsonFactory().createGenerator(writer);
+      writeRecordTopLevel(schema, gen);
+      gen.flush();
+      return writer.toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Serializes a list of versioned schemas to the {@code .hoodie/.schema/}
+   * history blob format: a single object whose {@code schemas} array contains
+   * each schema as a top-level record (carrying its own {@code version_id}
+   * and {@code max_column_id}).
+   */
+  public static String toJsonHistory(List<HoodieSchema> schemas) {
+    try {
+      StringWriter writer = new StringWriter();
+      JsonGenerator gen = new JsonFactory().createGenerator(writer);
+      gen.writeStartObject();
+      gen.writeArrayFieldStart(SCHEMAS);
+      for (HoodieSchema s : schemas) {
+        writeRecordTopLevel(s, gen);
+      }
+      gen.writeEndArray();
+      gen.writeEndObject();
+      gen.flush();
+      return writer.toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parses a single-schema JSON blob (typically the {@code latest_schema}
+   * value from commit metadata). Returns empty for null or empty input —
+   * the history-blob entry point is {@link #parseHistorySchemas(String)}.
+   */
+  public static Option<HoodieSchema> fromJson(String json) {
+    return fromJson(json, "hoodieSchema");
+  }
+
+  /**
+   * Variant of {@link #fromJson(String)} that fixes the record name on the
+   * resulting top-level HoodieSchema. Useful for callers that need the
+   * record name to match an existing schema namespace.
+   */
+  public static Option<HoodieSchema> fromJson(String json, String recordName) {
+    if (json == null || json.isEmpty()) {
+      return Option.empty();
+    }
+    try {
+      JsonNode node = JsonUtils.getObjectMapper().readTree(json);
+      return Option.of(parseRecordTopLevel(node, recordName));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Parses a history blob and returns its schemas keyed by {@code version_id}.
+   * The TreeMap ordering lets callers iterate from oldest to newest.
+   */
+  public static TreeMap<Long, HoodieSchema> parseHistorySchemas(String json) {
+    TreeMap<Long, HoodieSchema> result = new TreeMap<>();
+    try {
+      JsonNode node = JsonUtils.getObjectMapper().readTree(json);
+      if (!node.has(SCHEMAS)) {
+        throw new IllegalArgumentException(
+            String.format("cannot parser schemas from current json string, 
missing key name: %s", SCHEMAS));
+      }

Review Comment:
   🤖 nit: small typo — `"cannot parser schemas"` should be `"cannot parse 
schemas"`. This will show up verbatim in exception messages and logs.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to