hudi-agent commented on code in PR #18680:
URL: https://github.com/apache/hudi/pull/18680#discussion_r3183787211


##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaIdAssigner.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import org.apache.avro.Schema;
+
+import static org.apache.hudi.common.schema.HoodieSchema.ELEMENT_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.FIELD_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.KEY_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.VALUE_ID_PROP;
+
+/**
+ * Walks a {@link HoodieSchema} and assigns sequential integer ids to every 
addressable
+ * sub-schema (record fields, array elements, map keys and values), writing 
the result
+ * onto the underlying Avro {@link Schema}/{@link Schema.Field} as custom JSON 
properties.
+ *
+ * <p>The traversal order mirrors {@link 
org.apache.hudi.internal.schema.InternalSchemaBuilder}
+ * (record fields in declared order; array element id assigned before 
recursing into the
+ * element type; map key id then value id assigned before recursing into the 
value type)
+ * so id assignments are stable across the InternalSchema β†’ HoodieSchema 
migration and
+ * round-tripping a previously-IDed InternalSchema through HoodieSchema 
produces the same
+ * id mapping.</p>
+ *
+ * <p>Existing ids are preserved: if a node already has an id property, this 
assigner uses
+ * that id and bumps {@code nextId} past it, so reapplying the assigner to a 
partially
+ * IDed schema is idempotent. {@link #assign(HoodieSchema, int)} returns the 
new
+ * {@code maxColumnId} so callers can persist it on the schema root.</p>
+ */
+public final class HoodieSchemaIdAssigner {
+
+  private HoodieSchemaIdAssigner() {
+  }
+
+  /**
+   * Assigns ids onto {@code schema} starting from {@code startId} and writes 
the resulting
+   * {@code max-column-id} onto the schema root.
+   *
+   * @return the maximum column id present after assignment
+   */
+  public static int assign(HoodieSchema schema, int startId) {
+    int[] nextId = {startId};
+    visit(schema, nextId);
+    int maxId = nextId[0] - 1;
+    schema.setMaxColumnId(maxId);
+    // Field/element/key/value ids were stamped onto the underlying Avro
+    // Schema/Field directly (bypassing HoodieSchema.addProp's invalidation), 
so
+    // any cached HoodieSchemaIndex on this schema is now stale.
+    schema.invalidateIdIndex();
+    return maxId;
+  }
+
+  /**
+   * Assigns ids onto {@code schema}, starting after the highest id already 
present
+   * (or 0 if none). This is the entry point for first-time id assignment on a 
schema
+   * that has no prior IDs.
+   */
+  public static int assignFresh(HoodieSchema schema) {
+    HoodieSchemaIndex index = HoodieSchemaIndex.of(schema);
+    int start = Math.max(0, index.maxColumnIdSeen() + 1);
+    return assign(schema, start);
+  }
+
+  private static void visit(HoodieSchema schema, int[] nextId) {
+    HoodieSchema effective = schema.isNullable() ? schema.getNonNullType() : 
schema;
+    switch (effective.getType()) {
+      case RECORD:
+        // Mirror InternalSchemaConverter ordering: assign ids to all fields 
at this level
+        // before recursing into any of them.
+        for (HoodieSchemaField field : effective.getFields()) {
+          assignFieldId(field, nextId);
+        }
+        for (HoodieSchemaField field : effective.getFields()) {
+          visit(field.schema(), nextId);
+        }
+        return;
+      case ARRAY: {
+        Schema arr = effective.getAvroSchema();
+        assignContainerId(arr, ELEMENT_ID_PROP, nextId);
+        visit(effective.getElementType(), nextId);
+        return;
+      }
+      case MAP: {
+        Schema map = effective.getAvroSchema();
+        assignContainerId(map, KEY_ID_PROP, nextId);
+        assignContainerId(map, VALUE_ID_PROP, nextId);
+        visit(effective.getValueType(), nextId);
+        return;
+      }
+      default:
+        // primitive: nothing to assign
+    }
+  }
+
+  private static void assignFieldId(HoodieSchemaField field, int[] nextId) {
+    Schema.Field avroField = field.getAvroField();
+    Object existing = avroField.getObjectProp(FIELD_ID_PROP);
+    if (existing instanceof Number) {

Review Comment:
   πŸ€– Could `assign(schema, startId)` ever produce duplicate ids when called 
with a startId that's not strictly greater than all pre-existing ids? E.g. 
fields `[A(no id), B(id=0), C(id=1)]` with `assign(schema, 0)`: A is stamped 
with 0 (nextId=1), then B's existingId=0 < 1 is skipped, leaving A and B both 
holding id 0. `assignFresh` avoids this by precomputing a safe startId, but 
since `assign` is public is it worth either making it `private` / 
package-private, or doing a discovery pass first to bump `nextId` past all 
existing ids before stamping any blanks?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -72,116 +68,120 @@ case class AlterTableCommand(table: CatalogTable, 
changes: Seq[TableChange], cha
     // convert to delete first then add again
     val deleteChanges = changes.filter(p => 
p.isInstanceOf[DeleteColumn]).map(_.asInstanceOf[DeleteColumn])
     val addChanges = changes.filter(p => 
p.isInstanceOf[AddColumn]).map(_.asInstanceOf[AddColumn])
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyAddAction2Schema(sparkSession, 
applyDeleteAction2Schema(sparkSession, oldSchema, deleteChanges), addChanges)
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column replace finished")
   }
 
-  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, addChanges: Seq[AddColumn]): InternalSchema = {
-    val addChange = TableChanges.ColumnAddChange.get(oldSchema)
+  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, addChanges: Seq[AddColumn]): HoodieSchema = {
+    var cur = oldSchema
     addChanges.foreach { addColumn =>
       val names = addColumn.fieldNames()
       val parentName = AlterTableCommand.getParentName(names)
-      // add col change
-      val colType = 
SparkInternalSchemaConverter.buildTypeFromStructType(addColumn.dataType(), 
true, new AtomicInteger(0))
-      addChange.addColumns(parentName, names.last, colType, 
addColumn.comment())
-      // add position change
-      addColumn.position() match {
+      val fullName = names.mkString(".")
+      val colType = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(addColumn.dataType(),
 names.last)
+      val (positionType, positionRef) = addColumn.position() match {
         case after: TableChange.After =>
-          addChange.addPositionChange(names.mkString("."),
-            if (parentName.isEmpty) after.column() else parentName + "." + 
after.column(), "after")
+          (ColumnPositionType.AFTER, if (parentName.isEmpty) after.column() 
else parentName + "." + after.column())
         case _: TableChange.First =>
-          addChange.addPositionChange(names.mkString("."), "", "first")
+          (ColumnPositionType.FIRST, "")
         case _ =>
+          (ColumnPositionType.NO_OPERATION, "")
       }
+      cur = new HoodieSchemaChangeApplier(cur).applyAddChange(fullName, 
colType, addColumn.comment(), positionRef, positionType)
     }
-    SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange)
+    cur
   }
 
-  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = {
-    val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema)
-    deleteChanges.foreach { c =>
-      val originalColName = c.fieldNames().mkString(".")
-      checkSchemaChange(Seq(originalColName), table)
-      deleteChange.deleteColumn(originalColName)
+  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, deleteChanges: Seq[DeleteColumn]): HoodieSchema = {
+    val colNames = deleteChanges.map { c =>

Review Comment:
   πŸ€– This loop now applies each add to the cumulative `cur` schema, whereas the 
legacy code accumulated all add+position changes into a single 
`ColumnAddChange` against `oldSchema` and applied them once. For multi-column 
adds where one references another (e.g. `ADD COLUMNS (x AFTER z, z ...)`), the 
legacy batch handled it because both columns existed by the time positions were 
resolved; here the first iteration would fail to resolve `AFTER z`. Could you 
confirm this isn't reachable from Spark DDL, or apply the same batch fix you 
did in `applyUpdateAction` (accumulate changes, apply once)?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -166,44 +163,43 @@ private Option<Function<HoodieRecord, HoodieRecord>> 
composeSchemaEvolutionTrans
                                                                                
          HoodieBaseFile baseFile,
                                                                                
          HoodieWriteConfig writeConfig,
                                                                                
          HoodieTableMetaClient metaClient) {
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(writeConfig.getInternalSchema());
+    Option<HoodieSchema> querySchemaOpt = 
HoodieSchemaSerDe.fromJson(writeConfig.getInternalSchema());
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
       // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(),
+      HoodieSchema querySchema = 
HoodieSchemaEvolutionUtils.reconcileSchema(writerSchema,
           querySchemaOpt.get(), 
writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
       long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
-      InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
-      if (fileSchema.isEmptySchema() && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
+      HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.getSchemaByVersionId(commitInstantTime, metaClient);
+      if ((fileSchema == null || fileSchema.isEmptySchema()) && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
         try {
-          fileSchema = 
InternalSchemaConverter.convert(tableSchemaResolver.getTableSchema(true));
+          fileSchema = tableSchemaResolver.getTableSchema(true);
         } catch (Exception e) {
           throw new HoodieException(String.format("Failed to get 
InternalSchema for given versionId: %s", commitInstantTime), e);
         }
       }
-      final InternalSchema writeInternalSchema = fileSchema;
+      final HoodieSchema fileWriteSchema = fileSchema;
       List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
+      List<String> colNamesFromWriteSchema = 
fileWriteSchema.getAllColsFullName();
       List<String> sameCols = colNamesFromWriteSchema.stream()
           .filter(f -> {
-            int writerSchemaFieldId = writeInternalSchema.findIdByName(f);
+            int writerSchemaFieldId = fileWriteSchema.findIdByName(f);
             int querySchemaFieldId = querySchema.findIdByName(f);
 
             return colNamesFromQuerySchema.contains(f)
                 && writerSchemaFieldId == querySchemaFieldId
                 && writerSchemaFieldId != -1
-                && 
Objects.equals(writeInternalSchema.findType(writerSchemaFieldId), 
querySchema.findType(querySchemaFieldId));
+                && 
Objects.equals(fileWriteSchema.findType(writerSchemaFieldId), 
querySchema.findType(querySchemaFieldId));
           })
           .collect(Collectors.toList());
-      InternalSchema mergedSchema = new 
InternalSchemaMerger(writeInternalSchema, querySchema,
+      HoodieSchema newWriterSchema = new HoodieSchemaMerger(fileWriteSchema, 
querySchema,
           true, false, false).mergeSchema();

Review Comment:
   πŸ€– Could you switch this to `mergeSchema(writerSchema.getFullName())`? Legacy 
`InternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName())` 
aligned both `newWriterSchema` and `writeSchemaFromFile` to the writer's name; 
with the new no-arg `mergeSchema()`, `newWriterSchema` ends up named after 
`querySchema` (the persisted evolution schema) while `writeSchemaFromFile = 
fileWriteSchema` keeps the file's name β€” same drift you fixed for 
`AbstractHoodieLogRecordScanner` (line 685) and `HoodieAvroParquetReader` via 
the `mergeSchema(recordName)` overload.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -210,29 +208,29 @@ protected Pair<ClosableIterator<T>, HoodieSchema> 
getRecordsIterator(HoodieDataB
   /**
    * Get final Read Schema for support evolution.
    * step1: find the fileSchema for current dataBlock.
-   * step2: determine whether fileSchema is compatible with the final read 
internalSchema.
-   * step3: merge fileSchema and read internalSchema to produce final read 
schema.
+   * step2: determine whether fileSchema is compatible with the read evolution 
schema.
+   * step3: merge fileSchema and read evolution schema to produce final read 
schema.
    *
    * @param dataBlock current processed block
    * @return final read schema.
    */
   protected Option<Pair<Function<T, T>, HoodieSchema>> 
composeEvolvedSchemaTransformer(
       HoodieDataBlock dataBlock) {
-    if (internalSchema.isEmptySchema()) {
+    if (!evolutionSchemaOpt.isPresent()) {
       return Option.empty();
     }
 
     long currentInstantTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
-    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);
-    Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new 
InternalSchemaMerger(fileSchema, internalSchema,
+    HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);
+    Pair<HoodieSchema, Map<String, String>> mergedEvolutionSchema = new 
HoodieSchemaMerger(fileSchema, evolutionSchemaOpt.get(),
         true, false, false).mergeSchemaGetRenamed();
-    HoodieSchema mergedAvroSchema = 
HoodieSchema.fromAvroSchema(InternalSchemaConverter.convert(mergedInternalSchema.getLeft(),
 readerSchema.getFullName()).getAvroSchema());
+    HoodieSchema mergedAvroSchema = mergedEvolutionSchema.getLeft();

Review Comment:
   πŸ€– nit: this variable is a `HoodieSchema` now, not an Avro schema β€” 
`mergedAvroSchema` carried over from the legacy code where it really was an 
Avro `Schema`. Could you rename to `mergedSchema` here (and at the matching 
spots in `AbstractHoodieLogRecordScanner.composeEvolvedSchemaTransformer` and 
`FileGroupReaderSchemaHandler.getRequiredSchemaForFileAndRenamedColumns`)?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -73,59 +70,48 @@ object HoodieSchemaUtils {
   }
 
   /**
-   * get latest internalSchema from table
-   *
-   * @param config          instance of {@link HoodieConfig}
-   * @param tableMetaClient instance of HoodieTableMetaClient
-   * @return Option of InternalSchema. Will always be empty if schema on read 
is disabled
+   * Returns the schema-on-read evolution [[HoodieSchema]] for the table β€” 
carrying
+   * field ids and version metadata β€” or [[None]] when schema-on-read is 
disabled,
+   * no schema is in commit metadata, or any read error occurs (read failures 
are
+   * swallowed rather than propagated).
    */
-  def getLatestTableInternalSchema(config: HoodieConfig,
-                                   tableMetaClient: HoodieTableMetaClient): 
Option[InternalSchema] = {
-    getLatestTableInternalSchema(config.getProps, tableMetaClient)
+  def getLatestTableEvolutionSchema(config: HoodieConfig,
+                                    tableMetaClient: HoodieTableMetaClient): 
Option[HoodieSchema] = {
+    getLatestTableEvolutionSchema(config.getProps, tableMetaClient)
   }
 
-  /**
-   * get latest internalSchema from table
-   *
-   * @param props           instance of {@link Properties}
-   * @param tableMetaClient instance of HoodieTableMetaClient
-   * @return Option of InternalSchema. Will always be empty if schema on read 
is disabled
-   */
-  def getLatestTableInternalSchema(props: Properties,
-                                   tableMetaClient: HoodieTableMetaClient): 
Option[InternalSchema] = {
+  def getLatestTableEvolutionSchema(props: Properties,
+                                    tableMetaClient: HoodieTableMetaClient): 
Option[HoodieSchema] = {
     if (!ConfigUtils.getBooleanWithAltKeys(props, 
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
       None
     } else {
       try {
         val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
-        val internalSchemaOpt = 
tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
-        if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else 
None
+        val schemaOpt = 
tableSchemaResolver.getTableEvolutionSchemaFromCommitMetadata
+        if (schemaOpt.isPresent) Some(schemaOpt.get()) else None
       } catch {
         case _: Exception => None
       }
     }
   }
 
   /**
-   * Deduces writer's schema based on
-   * <ul>
-   *   <li>Source's schema</li>
-   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
-   * </ul>
+   * Deduces writer's schema from source schema, the table's latest Avro 
schema, and
+   * the table's schema-on-read evolution schema (if any).
    */
   def deduceWriterSchema(sourceSchema: HoodieSchema,
-                         latestTableSchemaOpt: Option[HoodieSchema],
-                         internalSchemaOpt: Option[InternalSchema],
-                         opts: Map[String, String]): HoodieSchema = {
+                                      latestTableSchemaOpt: 
Option[HoodieSchema],

Review Comment:
   πŸ€– nit: the continuation parameters are over-indented and no longer line up 
with `sourceSchema` on the previous line β€” looks like the rename pushed the 
column out. Could you re-align these (and the matching Java-friendly overload 
further down) to start under `sourceSchema`?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -244,13 +241,13 @@ object AlterTableCommand extends Logging {
   /**
     * Generate an commit with new schema to change the table's schema.
     *
-    * @param internalSchema new schema after change
+    * @param evolutionSchema new schema after change
     * @param historySchemaStr history schemas
     * @param table The hoodie table.
     * @param sparkSession The spark session.
     */
-  def commitWithSchema(internalSchema: InternalSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {
-    val schema = InternalSchemaConverter.convert(internalSchema, 
HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table))
+  def commitWithSchema(evolutionSchema: HoodieSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {
+    val schema = evolutionSchema

Review Comment:
   πŸ€– Legacy `commitWithSchema` forcibly renamed the schema to 
`HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table)`. The new 
code uses `evolutionSchema` as-is, so its existing `getFullName` is what gets 
persisted in `LATEST_SCHEMA` and used by the write client. If 
`evolutionSchema.getFullName` ever diverges from the table-identifier-based 
name (e.g. on first ALTER after migrating a table whose original schema name 
was set differently), the persisted record name will differ from legacy. Was 
this intentional, or should we still apply the canonical record name here?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaSerDe.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+/**
+ * HoodieSchema-shaped faΓ§ade for evolution-schema JSON serialization.
+ *
+ * <p>The on-disk format is fixed: the {@code latest_schema} blob in commit 
metadata
+ * and the {@code .hoodie/.schema/} history files use the same JSON layout that
+ * {@link SerDeHelper} has always produced ({@code schemas} array containing
+ * objects with {@code version_id}, {@code max_column_id}, {@code type}, 
{@code fields}, etc.).
+ * Old tables must remain readable, so this faΓ§ade delegates to {@link 
SerDeHelper}
+ * verbatim and converts at the HoodieSchema/InternalSchema boundary via
+ * {@link HoodieSchemaInternalSchemaBridge}, preserving field ids on the way 
out.</p>
+ *
+ * <p>Phase 5 of the InternalSchema removal will rewrite the JSON serializer in
+ * pure HoodieSchema terms behind this stable interface β€” but the byte-for-byte
+ * compatibility constraint stays in force.</p>
+ */
+public final class HoodieSchemaSerDe {
+
+  /**
+   * Commit-metadata key under which the latest schema's JSON is stored. 
Carried
+   * over verbatim from {@link SerDeHelper#LATEST_SCHEMA} so callers reading 
old
+   * commit metadata pick up the same blob.
+   */
+  public static final String LATEST_SCHEMA = SerDeHelper.LATEST_SCHEMA;
+
+  /**
+   * JSON object key that wraps the array of historical schemas. Same as
+   * {@link SerDeHelper#SCHEMAS}.
+   */
+  public static final String SCHEMAS = SerDeHelper.SCHEMAS;
+
+  private HoodieSchemaSerDe() {
+  }
+
+  /**
+   * Serializes a single schema to JSON. Output format matches the legacy
+   * {@link SerDeHelper#toJson(InternalSchema)} byte for byte.
+   */
+  public static String toJson(HoodieSchema schema) {
+    return 
SerDeHelper.toJson(HoodieSchemaInternalSchemaBridge.toInternalSchema(schema));
+  }
+
+  /**
+   * Serializes a history of schemas to JSON. Output format matches the legacy
+   * {@link SerDeHelper#toJson(List)} byte for byte.
+   */
+  public static String toJsonHistory(List<HoodieSchema> schemas) {
+    List<InternalSchema> converted = new ArrayList<>(schemas.size());
+    for (HoodieSchema s : schemas) {
+      converted.add(HoodieSchemaInternalSchemaBridge.toInternalSchema(s));
+    }
+    return SerDeHelper.toJson(converted);
+  }
+
+  /**
+   * Parses a single-schema JSON blob (typically the {@code latest_schema} 
value
+   * from commit metadata). Returns empty if the input is null/empty so callers
+   * can pass through optional commit metadata fields.
+   */
+  public static Option<HoodieSchema> fromJson(String json) {
+    Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+    if (!internal.isPresent()) {
+      return Option.empty();
+    }
+    return 
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(), 
defaultRecordName(internal.get())));
+  }
+
+  /**
+   * Variant of {@link #fromJson(String)} that lets the caller fix the record 
name
+   * on the resulting HoodieSchema. Equivalent to the legacy
+   * {@code SerDeHelper.fromJson(...).map(is -> 
InternalSchemaConverter.convert(is, recordName))}
+   * pattern, collapsed into a single call.
+   */
+  public static Option<HoodieSchema> fromJson(String json, String recordName) {
+    Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+    if (!internal.isPresent()) {
+      return Option.empty();
+    }
+    return 
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(), 
recordName));
+  }
+
+  /**
+   * Parses the history-schemas JSON layout (array of versioned schemas) and
+   * returns them keyed by {@code version_id}. Field ids are preserved on each
+   * returned HoodieSchema so the resulting map is directly usable by the
+   * read/merge path.
+   */
+  public static TreeMap<Long, HoodieSchema> fromJsonHistory(String json) {
+    TreeMap<Long, InternalSchema> internals = SerDeHelper.parseSchemas(json);
+    TreeMap<Long, HoodieSchema> out = new TreeMap<>();
+    for (Long versionId : internals.keySet()) {
+      InternalSchema is = internals.get(versionId);
+      HoodieSchema hs = HoodieSchemaInternalSchemaBridge.toHoodieSchema(is, 
defaultRecordName(is));
+      out.put(versionId, hs);
+    }
+    return out;
+  }
+
+  /**
+   * Appends a freshly-evolved schema to an existing serialized history blob 
and
+   * returns the new blob. Mirrors {@link 
SerDeHelper#inheritSchemas(InternalSchema, String)}
+   * β€” the {@code oldHistoryJson} is the prior {@code .hoodie/.schema/} 
contents
+   * (or empty for the first commit).
+   */
+  public static String inheritHistory(HoodieSchema newSchema, String 
oldHistoryJson) {
+    return SerDeHelper.inheritSchemas(
+        HoodieSchemaInternalSchemaBridge.toInternalSchema(newSchema), 
oldHistoryJson);
+  }
+
+  /**
+   * Resolves the schema-history entry that applies to a given version id β€” 
exact
+   * match if present, else the largest entry strictly less than {@code 
versionId},
+   * else {@code null}. HoodieSchema-shaped replacement for
+   * {@link 
org.apache.hudi.internal.schema.utils.InternalSchemaUtils#searchSchema}.
+   *
+   * <p>Note: legacy returned {@code InternalSchema.getEmptyInternalSchema()} 
on
+   * miss; this returns {@code null} so callers can choose their own empty
+   * sentinel via {@link HoodieSchema#empty()}. Most callers null-check + fall
+   * back, so the change is benign.</p>
+   */
+  public static HoodieSchema searchSchema(long versionId, 
java.util.TreeMap<Long, HoodieSchema> history) {
+    if (history.containsKey(versionId)) {
+      return history.get(versionId);
+    }
+    java.util.SortedMap<Long, HoodieSchema> headMap = 
history.headMap(versionId);
+    return headMap.isEmpty() ? null : headMap.get(headMap.lastKey());

Review Comment:
   πŸ€– nit: returning bare `null` for the miss case (and noting it in the 
javadoc) breaks the codebase convention of using `Option<HoodieSchema>` for 
absent schemas β€” every caller has to remember to null-check. Could you return 
`Option<HoodieSchema>` here instead?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -99,72 +99,23 @@ public static HoodieSchema convert(InternalSchema 
internalSchema, String name) {
   }
 
   public static InternalSchema pruneHoodieSchemaToInternalSchema(HoodieSchema 
schema, InternalSchema originSchema) {

Review Comment:
   πŸ€– nit: 
`org.apache.hudi.common.schema.HoodieSchemaUtils.collectLeafNames(schema)` is 
fully qualified at both call sites β€” could you add an import (aliasing if 
there's a name clash) so the body reads 
`HoodieSchemaUtils.collectLeafNames(schema)`?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to