voonhous commented on code in PR #18680:
URL: https://github.com/apache/hudi/pull/18680#discussion_r3191323792


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -166,44 +163,43 @@ private Option<Function<HoodieRecord, HoodieRecord>> 
composeSchemaEvolutionTrans
                                                                                
          HoodieBaseFile baseFile,
                                                                                
          HoodieWriteConfig writeConfig,
                                                                                
          HoodieTableMetaClient metaClient) {
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(writeConfig.getInternalSchema());
+    Option<HoodieSchema> querySchemaOpt = 
HoodieSchemaSerDe.fromJson(writeConfig.getInternalSchema());
     // TODO support bootstrap
     if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
       // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(),
+      HoodieSchema querySchema = 
HoodieSchemaEvolutionUtils.reconcileSchema(writerSchema,
           querySchemaOpt.get(), 
writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
       long commitInstantTime = Long.parseLong(baseFile.getCommitTime());
-      InternalSchema fileSchema = 
InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient);
-      if (fileSchema.isEmptySchema() && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
+      HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.getSchemaByVersionId(commitInstantTime, metaClient);
+      if ((fileSchema == null || fileSchema.isEmptySchema()) && 
writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) {
         TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
         try {
-          fileSchema = 
InternalSchemaConverter.convert(tableSchemaResolver.getTableSchema(true));
+          fileSchema = tableSchemaResolver.getTableSchema(true);
         } catch (Exception e) {
           throw new HoodieException(String.format("Failed to get 
InternalSchema for given versionId: %s", commitInstantTime), e);
         }
       }
-      final InternalSchema writeInternalSchema = fileSchema;
+      final HoodieSchema fileWriteSchema = fileSchema;
       List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
+      List<String> colNamesFromWriteSchema = 
fileWriteSchema.getAllColsFullName();
       List<String> sameCols = colNamesFromWriteSchema.stream()
           .filter(f -> {
-            int writerSchemaFieldId = writeInternalSchema.findIdByName(f);
+            int writerSchemaFieldId = fileWriteSchema.findIdByName(f);
             int querySchemaFieldId = querySchema.findIdByName(f);
 
             return colNamesFromQuerySchema.contains(f)
                 && writerSchemaFieldId == querySchemaFieldId
                 && writerSchemaFieldId != -1
-                && 
Objects.equals(writeInternalSchema.findType(writerSchemaFieldId), 
querySchema.findType(querySchemaFieldId));
+                && 
Objects.equals(fileWriteSchema.findType(writerSchemaFieldId), 
querySchema.findType(querySchemaFieldId));
           })
           .collect(Collectors.toList());
-      InternalSchema mergedSchema = new 
InternalSchemaMerger(writeInternalSchema, querySchema,
+      HoodieSchema newWriterSchema = new HoodieSchemaMerger(fileWriteSchema, 
querySchema,
           true, false, false).mergeSchema();

Review Comment:
   Fixed by passing writerSchema.getFullName() into mergeSchema. Same drift as 
the other call sites.



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaIdAssigner.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import org.apache.avro.Schema;
+
+import static org.apache.hudi.common.schema.HoodieSchema.ELEMENT_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.FIELD_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.KEY_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.VALUE_ID_PROP;
+
+/**
+ * Walks a {@link HoodieSchema} and assigns sequential integer ids to every 
addressable
+ * sub-schema (record fields, array elements, map keys and values), writing 
the result
+ * onto the underlying Avro {@link Schema}/{@link Schema.Field} as custom JSON 
properties.
+ *
+ * <p>The traversal order mirrors {@link 
org.apache.hudi.internal.schema.InternalSchemaBuilder}
+ * (record fields in declared order; array element id assigned before 
recursing into the
+ * element type; map key id then value id assigned before recursing into the 
value type)
+ * so id assignments are stable across the InternalSchema → HoodieSchema 
migration and
+ * round-tripping a previously-IDed InternalSchema through HoodieSchema 
produces the same
+ * id mapping.</p>
+ *
+ * <p>Existing ids are preserved: if a node already has an id property, this 
assigner uses
+ * that id and bumps {@code nextId} past it, so reapplying the assigner to a 
partially
+ * IDed schema is idempotent. {@link #assign(HoodieSchema, int)} returns the 
new
+ * {@code maxColumnId} so callers can persist it on the schema root.</p>
+ */
+public final class HoodieSchemaIdAssigner {
+
+  private HoodieSchemaIdAssigner() {
+  }
+
+  /**
+   * Assigns ids onto {@code schema} starting from {@code startId} and writes 
the resulting
+   * {@code max-column-id} onto the schema root.
+   *
+   * @return the maximum column id present after assignment
+   */
+  public static int assign(HoodieSchema schema, int startId) {
+    int[] nextId = {startId};
+    visit(schema, nextId);
+    int maxId = nextId[0] - 1;
+    schema.setMaxColumnId(maxId);
+    // Field/element/key/value ids were stamped onto the underlying Avro
+    // Schema/Field directly (bypassing HoodieSchema.addProp's invalidation), 
so
+    // any cached HoodieSchemaIndex on this schema is now stale.
+    schema.invalidateIdIndex();
+    return maxId;
+  }
+
+  /**
+   * Assigns ids onto {@code schema}, starting after the highest id already 
present
+   * (or 0 if none). This is the entry point for first-time id assignment on a 
schema
+   * that has no prior IDs.
+   */
+  public static int assignFresh(HoodieSchema schema) {
+    HoodieSchemaIndex index = HoodieSchemaIndex.of(schema);
+    int start = Math.max(0, index.maxColumnIdSeen() + 1);
+    return assign(schema, start);
+  }
+
+  private static void visit(HoodieSchema schema, int[] nextId) {
+    HoodieSchema effective = schema.isNullable() ? schema.getNonNullType() : 
schema;
+    switch (effective.getType()) {
+      case RECORD:
+        // Mirror InternalSchemaConverter ordering: assign ids to all fields 
at this level
+        // before recursing into any of them.
+        for (HoodieSchemaField field : effective.getFields()) {
+          assignFieldId(field, nextId);
+        }
+        for (HoodieSchemaField field : effective.getFields()) {
+          visit(field.schema(), nextId);
+        }
+        return;
+      case ARRAY: {
+        Schema arr = effective.getAvroSchema();
+        assignContainerId(arr, ELEMENT_ID_PROP, nextId);
+        visit(effective.getElementType(), nextId);
+        return;
+      }
+      case MAP: {
+        Schema map = effective.getAvroSchema();
+        assignContainerId(map, KEY_ID_PROP, nextId);
+        assignContainerId(map, VALUE_ID_PROP, nextId);
+        visit(effective.getValueType(), nextId);
+        return;
+      }
+      default:
+        // primitive: nothing to assign
+    }
+  }
+
+  private static void assignFieldId(HoodieSchemaField field, int[] nextId) {
+    Schema.Field avroField = field.getAvroField();
+    Object existing = avroField.getObjectProp(FIELD_ID_PROP);
+    if (existing instanceof Number) {

Review Comment:
   Fixed by adding a pre-scan in assign that bumps the start past every 
existing id (uses HoodieSchemaIndex.maxColumnIdSeen()). Tests calling 
assign(schema, 0) on partially-IDed fixtures stay correct, and assignFresh now 
just delegates to assign(schema, 0).



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -72,116 +68,120 @@ case class AlterTableCommand(table: CatalogTable, 
changes: Seq[TableChange], cha
     // convert to delete first then add again
     val deleteChanges = changes.filter(p => 
p.isInstanceOf[DeleteColumn]).map(_.asInstanceOf[DeleteColumn])
     val addChanges = changes.filter(p => 
p.isInstanceOf[AddColumn]).map(_.asInstanceOf[AddColumn])
-    val (oldSchema, historySchema) = 
getInternalSchemaAndHistorySchemaStr(sparkSession)
+    val (oldSchema, historySchema) = 
getEvolutionSchemaAndHistorySchemaStr(sparkSession)
     val newSchema = applyAddAction2Schema(sparkSession, 
applyDeleteAction2Schema(sparkSession, oldSchema, deleteChanges), addChanges)
-    val verifiedHistorySchema = if (historySchema == null || 
historySchema.isEmpty) {
-      SerDeHelper.inheritSchemas(oldSchema, "")
-    } else {
-      historySchema
-    }
+    val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema)
     AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, 
table, sparkSession)
     logInfo("column replace finished")
   }
 
-  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, addChanges: Seq[AddColumn]): InternalSchema = {
-    val addChange = TableChanges.ColumnAddChange.get(oldSchema)
+  def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, addChanges: Seq[AddColumn]): HoodieSchema = {
+    var cur = oldSchema
     addChanges.foreach { addColumn =>
       val names = addColumn.fieldNames()
       val parentName = AlterTableCommand.getParentName(names)
-      // add col change
-      val colType = 
SparkInternalSchemaConverter.buildTypeFromStructType(addColumn.dataType(), 
true, new AtomicInteger(0))
-      addChange.addColumns(parentName, names.last, colType, 
addColumn.comment())
-      // add position change
-      addColumn.position() match {
+      val fullName = names.mkString(".")
+      val colType = 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(addColumn.dataType(),
 names.last)
+      val (positionType, positionRef) = addColumn.position() match {
         case after: TableChange.After =>
-          addChange.addPositionChange(names.mkString("."),
-            if (parentName.isEmpty) after.column() else parentName + "." + 
after.column(), "after")
+          (ColumnPositionType.AFTER, if (parentName.isEmpty) after.column() 
else parentName + "." + after.column())
         case _: TableChange.First =>
-          addChange.addPositionChange(names.mkString("."), "", "first")
+          (ColumnPositionType.FIRST, "")
         case _ =>
+          (ColumnPositionType.NO_OPERATION, "")
       }
+      cur = new HoodieSchemaChangeApplier(cur).applyAddChange(fullName, 
colType, addColumn.comment(), positionRef, positionType)
     }
-    SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange)
+    cur
   }
 
-  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = {
-    val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema)
-    deleteChanges.foreach { c =>
-      val originalColName = c.fieldNames().mkString(".")
-      checkSchemaChange(Seq(originalColName), table)
-      deleteChange.deleteColumn(originalColName)
+  private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: 
HoodieSchema, deleteChanges: Seq[DeleteColumn]): HoodieSchema = {
+    val colNames = deleteChanges.map { c =>

Review Comment:
   Good catch, fixed. applyAddAction2Schema now accumulates all add+position 
changes into a single ColumnAddChange against the original schema, same pattern 
as the applyUpdateAction fix.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -244,13 +241,13 @@ object AlterTableCommand extends Logging {
   /**
     * Generate an commit with new schema to change the table's schema.
     *
-    * @param internalSchema new schema after change
+    * @param evolutionSchema new schema after change
     * @param historySchemaStr history schemas
     * @param table The hoodie table.
     * @param sparkSession The spark session.
     */
-  def commitWithSchema(internalSchema: InternalSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {
-    val schema = InternalSchemaConverter.convert(internalSchema, 
HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table))
+  def commitWithSchema(evolutionSchema: HoodieSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {
+    val schema = evolutionSchema

Review Comment:
   Confirmed unintentional. commitWithSchema now forces evolutionSchema's 
record name to HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table) 
before persisting, restoring the legacy canonical-name behavior.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -210,29 +208,29 @@ protected Pair<ClosableIterator<T>, HoodieSchema> 
getRecordsIterator(HoodieDataB
   /**
    * Get final Read Schema for support evolution.
    * step1: find the fileSchema for current dataBlock.
-   * step2: determine whether fileSchema is compatible with the final read 
internalSchema.
-   * step3: merge fileSchema and read internalSchema to produce final read 
schema.
+   * step2: determine whether fileSchema is compatible with the read evolution 
schema.
+   * step3: merge fileSchema and read evolution schema to produce final read 
schema.
    *
    * @param dataBlock current processed block
    * @return final read schema.
    */
   protected Option<Pair<Function<T, T>, HoodieSchema>> 
composeEvolvedSchemaTransformer(
       HoodieDataBlock dataBlock) {
-    if (internalSchema.isEmptySchema()) {
+    if (!evolutionSchemaOpt.isPresent()) {
       return Option.empty();
     }
 
     long currentInstantTime = 
Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME));
-    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);
-    Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new 
InternalSchemaMerger(fileSchema, internalSchema,
+    HoodieSchema fileSchema = 
HoodieSchemaHistoryCache.searchSchemaAndCache(currentInstantTime, 
hoodieTableMetaClient);
+    Pair<HoodieSchema, Map<String, String>> mergedEvolutionSchema = new 
HoodieSchemaMerger(fileSchema, evolutionSchemaOpt.get(),
         true, false, false).mergeSchemaGetRenamed();
-    HoodieSchema mergedAvroSchema = 
HoodieSchema.fromAvroSchema(InternalSchemaConverter.convert(mergedInternalSchema.getLeft(),
 readerSchema.getFullName()).getAvroSchema());
+    HoodieSchema mergedAvroSchema = mergedEvolutionSchema.getLeft();

Review Comment:
   Renamed to mergedSchema in all three locations.



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaSerDe.java:
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+/**
+ * HoodieSchema-shaped façade for evolution-schema JSON serialization.
+ *
+ * <p>The on-disk format is fixed: the {@code latest_schema} blob in commit 
metadata
+ * and the {@code .hoodie/.schema/} history files use the same JSON layout that
+ * {@link SerDeHelper} has always produced ({@code schemas} array containing
+ * objects with {@code version_id}, {@code max_column_id}, {@code type}, 
{@code fields}, etc.).
+ * Old tables must remain readable, so this façade delegates to {@link 
SerDeHelper}
+ * verbatim and converts at the HoodieSchema/InternalSchema boundary via
+ * {@link HoodieSchemaInternalSchemaBridge}, preserving field ids on the way 
out.</p>
+ *
+ * <p>Phase 5 of the InternalSchema removal will rewrite the JSON serializer in
+ * pure HoodieSchema terms behind this stable interface — but the byte-for-byte
+ * compatibility constraint stays in force.</p>
+ */
+public final class HoodieSchemaSerDe {
+
+  /**
+   * Commit-metadata key under which the latest schema's JSON is stored. 
Carried
+   * over verbatim from {@link SerDeHelper#LATEST_SCHEMA} so callers reading 
old
+   * commit metadata pick up the same blob.
+   */
+  public static final String LATEST_SCHEMA = SerDeHelper.LATEST_SCHEMA;
+
+  /**
+   * JSON object key that wraps the array of historical schemas. Same as
+   * {@link SerDeHelper#SCHEMAS}.
+   */
+  public static final String SCHEMAS = SerDeHelper.SCHEMAS;
+
+  private HoodieSchemaSerDe() {
+  }
+
+  /**
+   * Serializes a single schema to JSON. Output format matches the legacy
+   * {@link SerDeHelper#toJson(InternalSchema)} byte for byte.
+   */
+  public static String toJson(HoodieSchema schema) {
+    return 
SerDeHelper.toJson(HoodieSchemaInternalSchemaBridge.toInternalSchema(schema));
+  }
+
+  /**
+   * Serializes a history of schemas to JSON. Output format matches the legacy
+   * {@link SerDeHelper#toJson(List)} byte for byte.
+   */
+  public static String toJsonHistory(List<HoodieSchema> schemas) {
+    List<InternalSchema> converted = new ArrayList<>(schemas.size());
+    for (HoodieSchema s : schemas) {
+      converted.add(HoodieSchemaInternalSchemaBridge.toInternalSchema(s));
+    }
+    return SerDeHelper.toJson(converted);
+  }
+
+  /**
+   * Parses a single-schema JSON blob (typically the {@code latest_schema} 
value
+   * from commit metadata). Returns empty if the input is null/empty so callers
+   * can pass through optional commit metadata fields.
+   */
+  public static Option<HoodieSchema> fromJson(String json) {
+    Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+    if (!internal.isPresent()) {
+      return Option.empty();
+    }
+    return 
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(), 
defaultRecordName(internal.get())));
+  }
+
+  /**
+   * Variant of {@link #fromJson(String)} that lets the caller fix the record 
name
+   * on the resulting HoodieSchema. Equivalent to the legacy
+   * {@code SerDeHelper.fromJson(...).map(is -> 
InternalSchemaConverter.convert(is, recordName))}
+   * pattern, collapsed into a single call.
+   */
+  public static Option<HoodieSchema> fromJson(String json, String recordName) {
+    Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+    if (!internal.isPresent()) {
+      return Option.empty();
+    }
+    return 
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(), 
recordName));
+  }
+
+  /**
+   * Parses the history-schemas JSON layout (array of versioned schemas) and
+   * returns them keyed by {@code version_id}. Field ids are preserved on each
+   * returned HoodieSchema so the resulting map is directly usable by the
+   * read/merge path.
+   */
+  public static TreeMap<Long, HoodieSchema> fromJsonHistory(String json) {
+    TreeMap<Long, InternalSchema> internals = SerDeHelper.parseSchemas(json);
+    TreeMap<Long, HoodieSchema> out = new TreeMap<>();
+    for (Long versionId : internals.keySet()) {
+      InternalSchema is = internals.get(versionId);
+      HoodieSchema hs = HoodieSchemaInternalSchemaBridge.toHoodieSchema(is, 
defaultRecordName(is));
+      out.put(versionId, hs);
+    }
+    return out;
+  }
+
+  /**
+   * Appends a freshly-evolved schema to an existing serialized history blob 
and
+   * returns the new blob. Mirrors {@link 
SerDeHelper#inheritSchemas(InternalSchema, String)}
+   * — the {@code oldHistoryJson} is the prior {@code .hoodie/.schema/} 
contents
+   * (or empty for the first commit).
+   */
+  public static String inheritHistory(HoodieSchema newSchema, String 
oldHistoryJson) {
+    return SerDeHelper.inheritSchemas(
+        HoodieSchemaInternalSchemaBridge.toInternalSchema(newSchema), 
oldHistoryJson);
+  }
+
+  /**
+   * Resolves the schema-history entry that applies to a given version id — 
exact
+   * match if present, else the largest entry strictly less than {@code 
versionId},
+   * else {@code null}. HoodieSchema-shaped replacement for
+   * {@link 
org.apache.hudi.internal.schema.utils.InternalSchemaUtils#searchSchema}.
+   *
+   * <p>Note: legacy returned {@code InternalSchema.getEmptyInternalSchema()} 
on
+   * miss; this returns {@code null} so callers can choose their own empty
+   * sentinel via {@link HoodieSchema#empty()}. Most callers null-check + fall
+   * back, so the change is benign.</p>
+   */
+  public static HoodieSchema searchSchema(long versionId, 
java.util.TreeMap<Long, HoodieSchema> history) {
+    if (history.containsKey(versionId)) {
+      return history.get(versionId);
+    }
+    java.util.SortedMap<Long, HoodieSchema> headMap = 
history.headMap(versionId);
+    return headMap.isEmpty() ? null : headMap.get(headMap.lastKey());

Review Comment:
   Changed return type to Option<HoodieSchema>. Updated BaseHoodieWriteClient 
call site to .orElse(HoodieSchema.empty()).



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala:
##########
@@ -73,59 +70,48 @@ object HoodieSchemaUtils {
   }
 
   /**
-   * get latest internalSchema from table
-   *
-   * @param config          instance of {@link HoodieConfig}
-   * @param tableMetaClient instance of HoodieTableMetaClient
-   * @return Option of InternalSchema. Will always be empty if schema on read 
is disabled
+   * Returns the schema-on-read evolution [[HoodieSchema]] for the table — 
carrying
+   * field ids and version metadata — or [[None]] when schema-on-read is 
disabled,
+   * no schema is in commit metadata, or any read error occurs (read failures 
are
+   * swallowed rather than propagated).
    */
-  def getLatestTableInternalSchema(config: HoodieConfig,
-                                   tableMetaClient: HoodieTableMetaClient): 
Option[InternalSchema] = {
-    getLatestTableInternalSchema(config.getProps, tableMetaClient)
+  def getLatestTableEvolutionSchema(config: HoodieConfig,
+                                    tableMetaClient: HoodieTableMetaClient): 
Option[HoodieSchema] = {
+    getLatestTableEvolutionSchema(config.getProps, tableMetaClient)
   }
 
-  /**
-   * get latest internalSchema from table
-   *
-   * @param props           instance of {@link Properties}
-   * @param tableMetaClient instance of HoodieTableMetaClient
-   * @return Option of InternalSchema. Will always be empty if schema on read 
is disabled
-   */
-  def getLatestTableInternalSchema(props: Properties,
-                                   tableMetaClient: HoodieTableMetaClient): 
Option[InternalSchema] = {
+  def getLatestTableEvolutionSchema(props: Properties,
+                                    tableMetaClient: HoodieTableMetaClient): 
Option[HoodieSchema] = {
     if (!ConfigUtils.getBooleanWithAltKeys(props, 
DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) {
       None
     } else {
       try {
         val tableSchemaResolver = new TableSchemaResolver(tableMetaClient)
-        val internalSchemaOpt = 
tableSchemaResolver.getTableInternalSchemaFromCommitMetadata
-        if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else 
None
+        val schemaOpt = 
tableSchemaResolver.getTableEvolutionSchemaFromCommitMetadata
+        if (schemaOpt.isPresent) Some(schemaOpt.get()) else None
       } catch {
         case _: Exception => None
       }
     }
   }
 
   /**
-   * Deduces writer's schema based on
-   * <ul>
-   *   <li>Source's schema</li>
-   *   <li>Target table's schema (including Hudi's [[InternalSchema]] 
representation)</li>
-   * </ul>
+   * Deduces writer's schema from source schema, the table's latest Avro 
schema, and
+   * the table's schema-on-read evolution schema (if any).
    */
   def deduceWriterSchema(sourceSchema: HoodieSchema,
-                         latestTableSchemaOpt: Option[HoodieSchema],
-                         internalSchemaOpt: Option[InternalSchema],
-                         opts: Map[String, String]): HoodieSchema = {
+                                      latestTableSchemaOpt: 
Option[HoodieSchema],

Review Comment:
   Re-aligned both overloads.



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -99,72 +99,23 @@ public static HoodieSchema convert(InternalSchema 
internalSchema, String name) {
   }
 
   public static InternalSchema pruneHoodieSchemaToInternalSchema(HoodieSchema 
schema, InternalSchema originSchema) {

Review Comment:
   Added the import; both call sites now use the unqualified 
HoodieSchemaUtils.collectLeafNames.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to