hudi-agent commented on code in PR #18680:
URL: https://github.com/apache/hudi/pull/18680#discussion_r3191650510


##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaEvolutionUtils.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.types.Type;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
+import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * HoodieSchema-shaped faΓ§ade for write-path schema evolution: reconciling an
+ * incoming write schema against the table's current schema, including missing
+ * columns, added columns, type promotions, and nullability adjustments.
+ *
+ * <p>Mirrors the two entry points of {@link AvroSchemaEvolutionUtils} but 
consumes
+ * and produces {@link HoodieSchema} so callers can stay off direct
+ * {@code InternalSchema} usage. During the InternalSchema β†’ HoodieSchema 
migration
+ * this delegates to the legacy implementation via
+ * {@link HoodieSchemaInternalSchemaBridge}, which preserves field ids 
end-to-end.
+ * Phase 5 rewrites the implementation in pure HoodieSchema terms behind this 
stable
+ * interface.</p>
+ */
+public final class HoodieSchemaEvolutionUtils {
+
+  private HoodieSchemaEvolutionUtils() {
+  }
+
+  /**
+   * Reconciles an incoming write schema against the existing table schema, 
adding
+   * any new columns, promoting types where allowed, and (optionally) marking
+   * missing columns as nullable.
+   *
+   * <p>Semantics match {@link 
AvroSchemaEvolutionUtils#reconcileSchema(org.apache.avro.Schema, 
InternalSchema, boolean)}:
+   * the incoming schema is assumed to have <i>missing</i> columns rather than
+   * <i>deleted</i> columns. Renames and explicit deletes are not inferred 
here β€”
+   * those are handled by the explicit DDL path through
+   * {@link HoodieSchemaChangeApplier}.</p>
+   *
+   * @param incomingSchema             incoming write schema
+   * @param oldTableSchema             current table schema (with field ids)
+   * @param makeMissingFieldsNullable  when true, table fields absent from the
+   *                                   incoming schema are marked nullable in 
the
+   *                                   reconciled result
+   * @return reconciled HoodieSchema with field ids preserved on unchanged 
columns
+   */
+  public static HoodieSchema reconcileSchema(HoodieSchema incomingSchema,
+                                             HoodieSchema oldTableSchema,
+                                             boolean 
makeMissingFieldsNullable) {
+    InternalSchema oldInternal = 
HoodieSchemaInternalSchemaBridge.toInternalSchema(oldTableSchema);
+    InternalSchema reconciled = AvroSchemaEvolutionUtils.reconcileSchema(
+        incomingSchema.getAvroSchema(), oldInternal, 
makeMissingFieldsNullable);
+    return HoodieSchemaInternalSchemaBridge.toHoodieSchema(reconciled, 
oldTableSchema.getFullName());
+  }
+
+  /**
+   * Avro-only sibling of {@link #reconcileSchema(HoodieSchema, HoodieSchema, 
boolean)}
+   * that does <em>not</em> route through the InternalSchema bridge β€” field 
ids are
+   * neither read from the inputs nor stamped on the output. Use this from the
+   * write-path's structural reconciliation (e.g. {@code deduceWriterSchema}) 
where
+   * carrying ids over from the table's evolution-schema would leak them into
+   * commit metadata and Parquet writes that historically didn't include them.
+   */
+  public static HoodieSchema reconcileSchemaStructural(HoodieSchema 
incomingSchema,
+                                                       HoodieSchema 
oldTableSchema,
+                                                       boolean 
makeMissingFieldsNullable) {
+    org.apache.avro.Schema reconciled = 
AvroSchemaEvolutionUtils.reconcileSchema(
+        incomingSchema.getAvroSchema(), oldTableSchema.getAvroSchema(), 
makeMissingFieldsNullable);
+    return HoodieSchema.fromAvroSchema(reconciled);
+  }
+
+  /**
+   * Reconciles nullability and type-promotion requirements between a source
+   * (incoming) schema and a target (existing) schema, adjusting the source to 
be
+   * in line with the target's nullability and promotable types.
+   *
+   * <p>Semantics match
+   * {@link 
AvroSchemaEvolutionUtils#reconcileSchemaRequirements(org.apache.avro.Schema, 
org.apache.avro.Schema, boolean)}.
+   * If {@code shouldReorderColumns} is true, the source's fields are ordered 
to match
+   * the target's positional layout, preserving inter-commit field 
ordering.</p>
+   *
+   * @param sourceSchema         incoming source schema to be reconciled
+   * @param targetSchema         target schema to reconcile against
+   * @param shouldReorderColumns if true, fields in the result follow the 
target's order
+   * @return source-shaped HoodieSchema with nullability and types reconciled
+   */
+  public static HoodieSchema reconcileSchemaRequirements(HoodieSchema 
sourceSchema,
+                                                         HoodieSchema 
targetSchema,
+                                                         boolean 
shouldReorderColumns) {
+    org.apache.avro.Schema reconciled = 
AvroSchemaEvolutionUtils.reconcileSchemaRequirements(
+        sourceSchema == null ? null : sourceSchema.getAvroSchema(),
+        targetSchema == null ? null : targetSchema.getAvroSchema(),
+        shouldReorderColumns);
+    return HoodieSchema.fromAvroSchema(reconciled);
+  }
+
+  /**
+   * Normalizes union ordering so {@code null} sits first within nullable union
+   * branches, matching the ordering Hudi has historically written to disk. 
Returns
+   * {@code HoodieSchema.NULL_SCHEMA} for a {@code null} input and the schema
+   * unchanged for non-record types. Wraps the legacy
+   * {@code InternalSchemaConverter.fixNullOrdering} during the migration.
+   */
+  public static HoodieSchema fixNullOrdering(HoodieSchema schema) {
+    return InternalSchemaConverter.fixNullOrdering(schema);
+  }
+
+  /**
+   * Collects top-level columns whose primitive type differs between two 
schemas,
+   * keyed by the column's index in {@code schema}. The pair holds (newType,
+   * oldType) so callers can build a cast plan from {@code oldType} to
+   * {@code newType}. HoodieSchema-direct replacement for
+   * {@link InternalSchemaUtils#collectTypeChangedCols(InternalSchema, 
InternalSchema)}.
+   *
+   * <p>Walks ids on the HoodieSchema accessors directly; only converts to
+   * {@link Type} at the result construction (callers expect Type pairs for the
+   * cast-plan downstream).</p>
+   */
+  public static Map<Integer, Pair<Type, Type>> 
collectTypeChangedCols(HoodieSchema schema, HoodieSchema oldSchema) {
+    Set<Integer> ids = schema.getAllIds();
+    Set<Integer> otherIds = oldSchema.getAllIds();
+    Map<Integer, Pair<Type, Type>> result = new HashMap<>();
+    List<Integer> topLevelIds = schema.getFields().stream()
+        .map(HoodieSchemaField::fieldId).collect(Collectors.toList());
+    for (Integer id : ids) {
+      if (!otherIds.contains(id)) {
+        continue;
+      }
+      HoodieSchema thisType = schema.findType(id);

Review Comment:
   πŸ€– I think this comparison is now stricter than the legacy 
`InternalSchemaUtils.collectTypeChangedCols`. Legacy 
`InternalSchema.findType(id)` returned the unwrapped `Type` (no nullability 
info), so a field that only flipped nullability (e.g. `int?` β†’ `int`) compared 
equal and was skipped. New `HoodieSchema.findType(id)` returns `field.schema()` 
per `HoodieSchemaIndex.walk`, which is the `[null, T]` union for nullable 
fields β€” so `thisType.equals(otherType)` is now false for a nullability-only 
change and the column is reported as type-changed. Was that intentional, or 
should the comparison `getNonNullType()` both sides first to match legacy 
behavior?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java:
##########
@@ -210,29 +208,29 @@ protected Pair<ClosableIterator<T>, HoodieSchema> 
getRecordsIterator(HoodieDataB
   /**
    * Get final Read Schema for support evolution.

Review Comment:
   πŸ€– Same record-name drift you fixed in `AbstractHoodieLogRecordScanner` / 
`HoodieMergeHelper` / `HoodieAvroParquetReader` β€” legacy passed 
`readerSchema.getFullName()` to `InternalSchemaConverter.convert(...)` here, 
but `mergeSchemaGetRenamed()` (no-arg) defaults to `querySchema.getFullName()` 
(i.e. `evolutionSchemaOpt.get().getFullName()`). Could you switch to 
`mergeSchemaGetRenamed(readerSchema.getFullName())` to preserve the legacy name 
on the projection target?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java:
##########
@@ -132,35 +134,39 @@ public Option<UnaryOperator<T>> getOutputConverter() {
   }
 
   public Pair<HoodieSchema, Map<String, String>> 
getRequiredSchemaForFileAndRenamedColumns(StoragePath path) {
-    if (internalSchema.isEmptySchema()) {
+    if (!evolutionSchemaOpt.isPresent()) {

Review Comment:
   πŸ€– Same record-name drift here β€” legacy passed `requiredSchema.getFullName()` 
to `InternalSchemaConverter.convert(...)`, but `mergeSchemaGetRenamed()` 
(no-arg) defaults to `evolutionSchemaOpt.get().getFullName()`. Could you switch 
to `mergeSchemaGetRenamed(requiredSchema.getFullName())` so the interned merged 
schema keeps the required-schema's full name?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1687,43 +1694,48 @@ public void updateColumnType(String colName, Type 
newType) {
    * @param doc     .
    */
   public void updateColumnComment(String colName, String doc) {
-    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
-    InternalSchema newSchema = new 
InternalSchemaChangeApplier(pair.getLeft()).applyColumnCommentChange(colName, 
doc);
-    commitTableChange(newSchema, pair.getRight());
+    Pair<HoodieSchema, HoodieTableMetaClient> pair = 
getEvolutionSchemaAndMetaClient();
+    HoodieSchema evolved = new 
HoodieSchemaChangeApplier(pair.getLeft()).applyColumnCommentChange(colName, 
doc);
+    commitTableChange(evolved, pair.getRight());
   }
 
   /**
-   * reorder the position of col.
+   * Reorder the position of col.
    *
    * @param colName column which need to be reordered. if we want to change 
col from a nested filed, the fullName should be specified.
    * @param referColName reference position.
    * @param orderType col position change type. now support three change 
types: first/after/before
    */
-  public void reOrderColPosition(String colName, String referColName, 
TableChange.ColumnPositionChange.ColumnPositionType orderType) {
+  public void reOrderColPosition(String colName, String referColName, 
ColumnPositionType orderType) {
     if (colName == null || orderType == null || referColName == null) {
       return;
     }
-    //get internalSchema
-    Pair<InternalSchema, HoodieTableMetaClient> pair = 
getInternalSchemaAndMetaClient();
-    InternalSchema newSchema = new InternalSchemaChangeApplier(pair.getLeft())
+    Pair<HoodieSchema, HoodieTableMetaClient> pair = 
getEvolutionSchemaAndMetaClient();
+    HoodieSchema evolved = new HoodieSchemaChangeApplier(pair.getLeft())
         .applyReOrderColPositionChange(colName, referColName, orderType);
-    commitTableChange(newSchema, pair.getRight());
+    commitTableChange(evolved, pair.getRight());
   }
 
-  public Pair<InternalSchema, HoodieTableMetaClient> 
getInternalSchemaAndMetaClient() {
+  /**
+   * Resolves the table's evolution schema (or assigns fresh ids to the data 
schema
+   * as a fallback) and pairs it with a fresh metaClient.
+   */
+  public Pair<HoodieSchema, HoodieTableMetaClient> 
getEvolutionSchemaAndMetaClient() {
     HoodieTableMetaClient metaClient = createMetaClient(true);
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
-    return Pair.of(getInternalSchema(schemaUtil), metaClient);
+    return Pair.of(getEvolutionSchema(schemaUtil), metaClient);
   }
 
-  public void commitTableChange(InternalSchema newSchema, 
HoodieTableMetaClient metaClient) {
+  /**
+   * Persists the post-DDL evolution schema to commit metadata and the history 
file.
+   */
+  public void commitTableChange(HoodieSchema newEvolutionSchema, 
HoodieTableMetaClient metaClient) {
     TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
     String historySchemaStr = 
schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElseGet(
-        () -> SerDeHelper.inheritSchemas(getInternalSchema(schemaUtil), ""));
-    HoodieSchema schema = InternalSchemaConverter.convert(newSchema, 
HoodieSchemaUtils.getRecordQualifiedName(config.getTableName()));
+        () -> HoodieSchemaSerDe.inheritHistory(getEvolutionSchema(schemaUtil), 
""));
     String commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, 
metaClient.getTableType());
     String instantTime = startCommit(commitActionType, metaClient);
-    config.setSchema(schema.toString());
+    config.setSchema(newEvolutionSchema.toString());

Review Comment:
   πŸ€– The legacy `commitTableChange` forcibly renamed the schema to 
`HoodieSchemaUtils.getRecordQualifiedName(config.getTableName())` before 
`config.setSchema(...)`. The new code stores `newEvolutionSchema.toString()` 
directly, so the record name now drifts to whatever the input HoodieSchema 
carried. You already applied the canonical-name fix in 
`AlterTableCommand.commitWithSchema` (lines 256-264) β€” should the same 
normalization be done here? The Flink `HoodieCatalogUtil.alterTable` path goes 
through this method and would otherwise diverge from the Spark path.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala:
##########
@@ -244,13 +247,22 @@ object AlterTableCommand extends Logging {
   /**
     * Generate an commit with new schema to change the table's schema.
     *
-    * @param internalSchema new schema after change
+    * @param evolutionSchema new schema after change
     * @param historySchemaStr history schemas
     * @param table The hoodie table.
     * @param sparkSession The spark session.
     */
-  def commitWithSchema(internalSchema: InternalSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {
-    val schema = InternalSchemaConverter.convert(internalSchema, 
HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table))
+  def commitWithSchema(rawEvolutionSchema: HoodieSchema, historySchemaStr: 
String, table: CatalogTable, sparkSession: SparkSession): Unit = {
+    // Force the persisted schema's record name to the canonical
+    // <namespace>.<tableName> form, matching the legacy
+    // InternalSchemaConverter.convert(merged, 
getRecordQualifiedName(table.identifier.table))
+    // behavior. Without this, an evolution schema parsed from prior commit 
metadata would
+    // round-trip its existing name, which can drift from the 
table-identifier-based name
+    // and end up persisted in LATEST_SCHEMA / consumed by the write client.
+    val canonicalName = 
HoodieCommonSchemaUtils.getRecordQualifiedName(table.identifier.table)
+    val evolutionSchema = if (rawEvolutionSchema.getFullName == canonicalName) 
rawEvolutionSchema
+      else HoodieSchemaInternalSchemaBridge.withRecordName(rawEvolutionSchema, 
canonicalName)
+    val schema = evolutionSchema
     val path = getTableLocation(table, sparkSession)

Review Comment:
   πŸ€– nit: `val schema = evolutionSchema` is a redundant alias β€” `schema` isn't 
used anywhere that `evolutionSchema` couldn't be used directly. Could you drop 
this line and use `evolutionSchema` consistently throughout the method?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java:
##########
@@ -200,26 +207,26 @@ public void 
doEvolutionForRealtimeInputFormat(AbstractRealtimeRecordReader realt
       LOG.warn("Expected realtime split for mor table. Found split: {}", 
split);
       return;
     }
-    if (internalSchemaOption.isPresent()) {
+    if (evolutionSchemaOption.isPresent()) {
       HoodieSchema tableSchema = getSchemaFromCache();
       List<String> requiredColumns = getRequireColumn(job);
-      InternalSchema prunedInternalSchema = 
InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(),
-          requiredColumns);
+      HoodieSchema writerSchema = 
HoodieSchemaInternalSchemaBridge.withRecordName(
+          evolutionSchemaOption.get(), tableSchema.getName());
+      HoodieSchema prunedEvolutionSchema = 
HoodieSchemaInternalSchemaBridge.pruneByLeafNames(

Review Comment:
   πŸ€– I think the reader schema's record name drifts from the writer schema's 
here. Legacy did `InternalSchemaConverter.convert(prunedInternalSchema, 
tableSchema.getName())` for both, but `pruneByLeafNames` preserves the source's 
full name β€” and `HoodieSchemaSerDe.fromJson` defaults that to `"hoodieSchema"` 
(see `defaultRecordName`), so `prunedEvolutionSchema.getFullName()` will 
typically be `"hoodieSchema"` while the writer is `tableSchema.getName()`. 
Could you wrap the pruned result in 
`HoodieSchemaInternalSchemaBridge.withRecordName(..., tableSchema.getName())` 
to match the writer (same fix you applied in `HoodieAvroParquetReader` via 
`mergeSchema(baseSchema.getFullName())`)?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaHistoryCache.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.TimelineLayout;
+import org.apache.hudi.common.util.InternalSchemaCache;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.storage.HoodieStorage;
+
+/**
+ * HoodieSchema-shaped faΓ§ade over the global schema-history cache. Delegates 
to
+ * {@link InternalSchemaCache} so the underlying segment-locked Caffeine cache 
is
+ * shared across the legacy and new code paths during migration β€” there's only 
one
+ * cache per JVM, regardless of which API the caller used.
+ *
+ * <p>String-based entry points (commit-metadata blob lookup) pass through
+ * unchanged; methods that previously returned {@link InternalSchema} now 
return
+ * {@link HoodieSchema} via {@link HoodieSchemaInternalSchemaBridge}, with 
field
+ * ids preserved.</p>
+ */
+public final class HoodieSchemaHistoryCache {
+
+  private HoodieSchemaHistoryCache() {
+  }
+
+  /**
+   * Resolves a schema by version id, hitting the cache when possible and 
falling
+   * back to the schema-history files on disk. Returns null when no schema with
+   * that id exists for the table.
+   */
+  public static HoodieSchema searchSchemaAndCache(long versionId, 
HoodieTableMetaClient metaClient) {
+    InternalSchema internal = 
InternalSchemaCache.searchSchemaAndCache(versionId, metaClient);
+    return wrap(internal);
+  }

Review Comment:
   πŸ€– nit: this method's name still says `InternalSchema` even though the new 
class deliberately drops that vocabulary β€” could you rename to something like 
`getEvolutionSchemaAndAvroSchemaForClusteringAndCompaction` (and update the one 
caller in `RunCompactionActionExecutor`) so the faΓ§ade reads consistently?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkInternalSchemaConverter.java:
##########
@@ -212,27 +208,30 @@ private static void 
collectColNamesFromStructType(DataType sparkType, Deque<Stri
 
   private static void addFullName(DataType sparkType, String name, 
Deque<String> fieldNames, List<String> resultSet) {
     if (!(sparkType instanceof StructType) && !(sparkType instanceof 
ArrayType) && !(sparkType instanceof MapType)) {
-      resultSet.add(InternalSchemaUtils.createFullName(name, fieldNames));
+      if (fieldNames.isEmpty()) {

Review Comment:
   πŸ€– nit: this inlined `addFullName` re-implements what 
`HoodieSchemaIndex.createFullName(String, Deque<String>)` already does. Could 
you call that helper (or extract a shared util) instead of duplicating the 
descending-iterator logic?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaInternalSchemaBridge.java:
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
+import org.apache.hudi.common.schema.types.Type;
+import org.apache.hudi.common.schema.types.Types;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
+import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * One-way bridge from {@link InternalSchema} to {@link HoodieSchema} that 
preserves
+ * column ids by stamping them as Avro custom properties on the HoodieSchema's
+ * underlying schema tree.
+ *
+ * <p>This exists during the InternalSchema β†’ HoodieSchema migration. The 
existing
+ * {@link InternalSchemaConverter#convert(InternalSchema, String)} produces a
+ * structurally-correct HoodieSchema but discards field ids. Downstream code in
+ * the new evolution layer relies on {@code field-id} / {@code element-id} /
+ * {@code key-id} / {@code value-id} properties being present, so we walk the
+ * InternalSchema and stamped HoodieSchema in lock-step and copy ids over.</p>
+ *
+ * <p>The walk order matches {@code 
InternalSchemaConverter.visitInternalSchemaToBuildHoodieSchema}
+ * (record fields in declared order; array element after array; map key + 
value after map),
+ * so positional pairing is exact.</p>
+ *
+ * <p>Public for the migration period only β€” Phase 4 callsite migrations across
+ * different packages need access to the conversion. Once Phase 5 rewrites the
+ * action algebra on pure HoodieSchema, this bridge and its dependency on
+ * {@code InternalSchema} go away.</p>
+ */
+public final class HoodieSchemaInternalSchemaBridge {
+
+  private HoodieSchemaInternalSchemaBridge() {
+  }
+
+  /**
+   * Converts a {@link HoodieSchema} to an {@link InternalSchema}, preserving 
column
+   * ids carried as {@code field-id} / {@code element-id} / {@code key-id} /
+   * {@code value-id} Avro custom properties. This is the inverse of
+   * {@link #toHoodieSchema(InternalSchema, String)} and exists so the faΓ§ade 
can
+   * round-trip a HoodieSchema through the legacy applier without renumbering 
ids on
+   * every call.
+   *
+   * <p>For HoodieSchemas that have not yet had ids assigned (e.g. freshly 
parsed
+   * input), this falls back to the existing
+   * {@link InternalSchemaConverter#convert(HoodieSchema)} which mints fresh 
ids.</p>
+   */
+  public static InternalSchema toInternalSchema(HoodieSchema hoodieSchema) {
+    if (hoodieSchema == null || hoodieSchema.isEmptySchema()) {
+      return InternalSchema.getEmptyInternalSchema();
+    }
+    // Take the structurally-correct InternalSchema produced by the existing 
converter,
+    // then walk both schemas in parallel and overwrite the InternalSchema's 
freshly-minted
+    // ids with the ids carried as Avro properties on the HoodieSchema (where 
present).
+    InternalSchema fresh = InternalSchemaConverter.convert(hoodieSchema, 
hoodieSchema.getNameToPosition());
+    Types.RecordType originalRecord = fresh.getRecord();
+    Types.RecordType reidentified = (Types.RecordType) 
reidentify(hoodieSchema, originalRecord);
+    InternalSchema result = (originalRecord == reidentified)
+        ? fresh
+        : new InternalSchema(reidentified);
+    long schemaId = hoodieSchema.schemaId();
+    if (schemaId >= 0) {
+      result.setSchemaId(schemaId);
+    }
+    int maxColumnId = hoodieSchema.maxColumnId();
+    if (maxColumnId >= 0) {
+      result.setMaxColumnId(maxColumnId);
+    }
+    return result;
+  }
+
+  /**
+   * Walks a HoodieSchema and the corresponding InternalSchema {@link Type} in 
parallel
+   * and produces a {@link Type} where each addressable id matches the 
HoodieSchema's
+   * Avro custom property (when present). Returns the original {@code 
internalType}
+   * unchanged when no overrides apply, so callers can short-circuit.
+   */
+  private static Type reidentify(HoodieSchema hoodieSchema, Type internalType) 
{
+    HoodieSchema effective = hoodieSchema.isNullable() ? 
hoodieSchema.getNonNullType() : hoodieSchema;
+    switch (internalType.typeId()) {
+      case RECORD: {
+        Types.RecordType record = (Types.RecordType) internalType;
+        if (effective.getType() != HoodieSchemaType.RECORD) {
+          return internalType;
+        }
+        List<Types.Field> originalFields = record.fields();
+        List<Types.Field> rebuilt = new ArrayList<>(originalFields.size());
+        boolean anyChange = false;
+        for (int i = 0; i < originalFields.size(); i++) {
+          Types.Field original = originalFields.get(i);
+          HoodieSchemaField hf = effective.getFields().get(i);
+          int overrideId = hf.fieldId();
+          Type childType = reidentify(hf.schema(), original.type());
+          int finalId = overrideId >= 0 ? overrideId : original.fieldId();
+          if (finalId == original.fieldId() && childType == original.type()) {
+            rebuilt.add(original);
+          } else {
+            rebuilt.add(Types.Field.get(finalId, original.isOptional(), 
original.name(), childType, original.doc()));
+            anyChange = true;
+          }
+        }
+        return anyChange ? Types.RecordType.get(rebuilt, record.name()) : 
record;
+      }
+      case ARRAY: {
+        Types.ArrayType array = (Types.ArrayType) internalType;
+        if (effective.getType() != HoodieSchemaType.ARRAY) {
+          return internalType;
+        }
+        int overrideElementId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP),
 -1);
+        Type newElement = reidentify(effective.getElementType(), 
array.elementType());
+        int finalElementId = overrideElementId >= 0 ? overrideElementId : 
array.elementId();
+        if (finalElementId == array.elementId() && newElement == 
array.elementType()) {
+          return array;
+        }
+        return Types.ArrayType.get(finalElementId, array.isElementOptional(), 
newElement);
+      }
+      case MAP: {
+        Types.MapType map = (Types.MapType) internalType;
+        if (effective.getType() != HoodieSchemaType.MAP) {
+          return internalType;
+        }
+        int overrideKeyId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP), 
-1);
+        int overrideValueId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP),
 -1);
+        Type newValue = reidentify(effective.getValueType(), map.valueType());
+        int finalKeyId = overrideKeyId >= 0 ? overrideKeyId : map.keyId();
+        int finalValueId = overrideValueId >= 0 ? overrideValueId : 
map.valueId();
+        if (finalKeyId == map.keyId() && finalValueId == map.valueId() && 
newValue == map.valueType()) {
+          return map;
+        }
+        return Types.MapType.get(finalKeyId, finalValueId, map.keyType(), 
newValue, map.isValueOptional());
+      }
+      default:
+        return internalType;
+    }
+  }
+
+  private static int readIntProp(Object raw, int fallback) {
+    return raw instanceof Number ? ((Number) raw).intValue() : fallback;
+  }
+
+  /**
+   * Converts an {@link InternalSchema} to a {@link HoodieSchema} and stamps 
every
+   * sub-schema with the corresponding field id from the source. The 
schema-level
+   * version id and max column id are also propagated.
+   */
+  public static HoodieSchema toHoodieSchema(InternalSchema internalSchema, 
String recordName) {
+    if (internalSchema == null || internalSchema.isEmptySchema()) {
+      return HoodieSchema.empty();
+    }
+    HoodieSchema hoodieSchema = 
InternalSchemaConverter.convert(internalSchema, recordName);
+    stampIds(hoodieSchema, internalSchema.getRecord());
+    hoodieSchema.setSchemaId(internalSchema.schemaId());
+    hoodieSchema.setMaxColumnId(internalSchema.getMaxColumnId());
+    hoodieSchema.invalidateIdIndex();
+    return hoodieSchema;
+  }
+
+  /**
+   * Prunes {@code source} to the supplied leaf-name list, preserving field 
ids.
+   * The returned HoodieSchema's record name is taken from {@code source}.
+   *
+   * <p>Single entry point for the bridge round-trip pattern that several call
+   * sites had open-coded: bridge to {@link InternalSchema}, prune via
+   * {@link InternalSchemaUtils#pruneInternalSchema}, then bridge back.</p>
+   */
+  public static HoodieSchema pruneByLeafNames(HoodieSchema source, 
List<String> leafNames) {
+    InternalSchema pruned = 
InternalSchemaUtils.pruneInternalSchema(toInternalSchema(source), leafNames);
+    return toHoodieSchema(pruned, source.getFullName());
+  }
+
+  /**
+   * Prunes {@code source} down to the leaves of {@code requiredSchema}, 
preserving
+   * field ids. Returns a HoodieSchema named after {@code requiredSchema}.
+   */
+  public static HoodieSchema pruneByRequiredSchema(HoodieSchema source, 
HoodieSchema requiredSchema) {
+    InternalSchema pruned = InternalSchemaUtils.pruneInternalSchema(
+        toInternalSchema(source), 
HoodieSchemaUtils.collectLeafNames(requiredSchema));
+    return toHoodieSchema(pruned, requiredSchema.getFullName());
+  }
+
+  /**
+   * Returns a HoodieSchema with the same fields and ids as {@code source} but 
with
+   * its record name set to {@code recordName}. Implemented as a bridge 
round-trip
+   * since {@link HoodieSchema} has no in-place rename API.
+   */
+  public static HoodieSchema withRecordName(HoodieSchema source, String 
recordName) {
+    return toHoodieSchema(toInternalSchema(source), recordName);
+  }
+
+  private static void stampIds(HoodieSchema hoodieSchema, Type type) {
+    HoodieSchema effective = hoodieSchema.isNullable() ? 
hoodieSchema.getNonNullType() : hoodieSchema;
+    switch (type.typeId()) {
+      case RECORD: {
+        Types.RecordType record = (Types.RecordType) type;
+        // The HoodieSchema produced by InternalSchemaConverter preserves the 
declared
+        // field order, so positional pairing with InternalSchema is exact.
+        if (effective.getType() != HoodieSchemaType.RECORD) {
+          return;
+        }
+        for (int i = 0; i < record.fields().size(); i++) {
+          Types.Field internalField = record.fields().get(i);
+          HoodieSchemaField hoodieField = effective.getFields().get(i);
+          stampPropIfAbsent(hoodieField.getAvroField()::getObjectProp,
+              hoodieField.getAvroField()::addProp, HoodieSchema.FIELD_ID_PROP, 
internalField.fieldId());
+          stampIds(hoodieField.schema(), internalField.type());
+        }
+        return;
+      }
+      case ARRAY: {
+        Types.ArrayType array = (Types.ArrayType) type;
+        if (effective.getType() != HoodieSchemaType.ARRAY) {
+          return;
+        }
+        stampPropIfAbsent(effective.getAvroSchema()::getObjectProp, 
effective.getAvroSchema()::addProp, HoodieSchema.ELEMENT_ID_PROP, 
array.elementId());
+        stampIds(effective.getElementType(), array.elementType());
+        return;
+      }
+      case MAP: {
+        Types.MapType map = (Types.MapType) type;
+        if (effective.getType() != HoodieSchemaType.MAP) {
+          return;
+        }
+        stampPropIfAbsent(effective.getAvroSchema()::getObjectProp, 
effective.getAvroSchema()::addProp, HoodieSchema.KEY_ID_PROP, map.keyId());
+        stampPropIfAbsent(effective.getAvroSchema()::getObjectProp, 
effective.getAvroSchema()::addProp, HoodieSchema.VALUE_ID_PROP, map.valueId());
+        stampIds(effective.getValueType(), map.valueType());
+        return;
+      }
+      default:
+        // primitives have no addressable child id
+    }
+  }
+
+  /**
+   * Stamps {@code value} under {@code key} only if no value is already 
present, since
+   * Avro's JsonProperties is set-once and would throw on a second {@code 
addProp}.
+   * The same HoodieSchema instance can be reached more than once when an
+   * InternalSchema's {@link Types.RecordType} is shared across paths (the
+   * InternalSchemaConverter caches by Type identity), so this guard makes
+   * {@link #stampIds} idempotent. Mismatched re-stamps surface as a hard 
failure
+   * since the field-id is the schema's identity and a divergence here means

Review Comment:
   πŸ€– nit: `stampPropIfAbsent` declares its parameters as 
`java.util.function.Function<...>` / `java.util.function.BiConsumer<...>` 
inline β€” could you import these and use unqualified names? It would also let 
you simplify the call sites (`effective.getAvroSchema()::getObjectProp` etc.) 
which are a bit dense as-is.
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -99,72 +100,23 @@ public static HoodieSchema convert(InternalSchema 
internalSchema, String name) {
   }
 
   public static InternalSchema pruneHoodieSchemaToInternalSchema(HoodieSchema 
schema, InternalSchema originSchema) {
-    List<String> pruneNames = collectColNamesFromSchema(schema);
-    return InternalSchemaUtils.pruneInternalSchema(originSchema, pruneNames);
+    return InternalSchemaUtils.pruneInternalSchema(originSchema,
+        HoodieSchemaUtils.collectLeafNames(schema));
   }
 
   /**
    * Collect all the leaf nodes names.
    *
+   * <p>Delegates to {@link 
org.apache.hudi.common.schema.HoodieSchemaUtils#collectLeafNames}
+   * β€” the HoodieSchema-direct walker that moved out of internal.schema in the
+   * Phase 5 ramp-up. Kept here for {@link TestInternalSchemaConverter} 
visibility.</p>

Review Comment:
   πŸ€– nit: "Phase 5 ramp-up" is internal migration jargon that won't mean 
anything to a future reader. Could you drop the phase reference and just say 
something like "delegates to HoodieSchemaUtils#collectLeafNames; kept here only 
so TestInternalSchemaConverter can exercise the wrapper"?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java:
##########
@@ -64,14 +62,14 @@ public HoodieAvroParquetReader(InputSplit inputSplit, 
Configuration conf, Option
       MessageType messageType = fileFooter.getFileMetaData().getSchema();
       baseSchema = getAvroSchemaConverter(conf).convert(messageType);
 
-      if (internalSchemaOption.isPresent()) {
+      if (evolutionSchemaOption.isPresent()) {
         // do schema reconciliation in case there exists read column which is 
not in the file schema.
-        InternalSchema mergedInternalSchema = new InternalSchemaMerger(
-            InternalSchemaConverter.convert(baseSchema),
-            internalSchemaOption.get(),
-            true,
-            true).mergeSchema();
-        baseSchema = InternalSchemaConverter.convert(mergedInternalSchema, 
baseSchema.getFullName());
+        // Preserve the file schema's record name on the merged result, 
matching the legacy
+        // InternalSchemaConverter.convert(merged, baseSchema.getFullName()) 
behavior β€” the
+        // hardcoded "schema" name from getEvolutionSchemaOption("schema") 
would otherwise leak

Review Comment:
   πŸ€– nit: the comment refers to `getEvolutionSchemaOption("schema")`, but 
there's no such method/call here β€” I think this is meant to describe the legacy 
`InternalSchemaConverter.convert(merged, "schema")` default. Could you rephrase 
so a reader doesn't go looking for that method?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1315,6 +1353,172 @@ public Object getProp(String key) {
   public void addProp(String key, Object value) {
     ValidationUtils.checkArgument(key != null && !key.isEmpty(), "Property key 
cannot be null or empty");
     avroSchema.addProp(key, value);
+    this.idIndex = null;
+  }
+
+  /**
+   * Factory for an "empty" schema sentinel: an unnamed record with no fields 
and an
+   * unset schemaId. Equivalent to {@code 
InternalSchema#getEmptyInternalSchema()} β€”
+   * used by callers that need a placeholder when no evolution schema is 
available
+   * (e.g. schema-on-read disabled or no schema in commit metadata). Pair with
+   * {@link #isEmptySchema()} to detect the sentinel.
+   */
+  public static HoodieSchema empty() {
+    return createRecord("EmptySchema", null, "hudi", false, 
Collections.emptyList());
+  }
+
+  /**
+   * Returns true if this schema is the "empty" sentinel β€” i.e. a record with
+   * no fields (the {@link #empty()} placeholder). Replaces
+   * {@code InternalSchema#isEmptySchema()}.
+   *
+   * <p>Note: the legacy {@code InternalSchema.isEmptySchema()} keyed off
+   * {@code versionId < 0}, which was effectively a no-fields check (only the
+   * EMPTY_SCHEMA singleton ever had a negative version id). We mirror the
+   * structural intent here so any HoodieSchema built via {@code parse()} /
+   * {@code fromAvroSchema()} / {@code createRecord()} without a subsequent
+   * {@link #setSchemaId(long)} is not mis-classified as empty.</p>
+   */
+  public boolean isEmptySchema() {
+    return type != HoodieSchemaType.RECORD || avroSchema.getFields().isEmpty();
+  }
+
+  /**
+   * Returns the schema version id (replaces InternalSchema#schemaId).
+   * Returns -1 if no version has been assigned.
+   */
+  public long schemaId() {
+    return schemaId;
+  }
+
+  /**
+   * Sets the schema version id. Typically derived from a commit instant 
timestamp.
+   * Mutable: callers that need to re-stamp the version (e.g. after an 
evolution
+   * operation) can call this multiple times.
+   */
+  public HoodieSchema setSchemaId(long schemaId) {
+    this.schemaId = schemaId;
+    return this;
+  }
+
+  /**
+   * Returns the highest column id assigned to any sub-schema. If an explicit
+   * max-column-id has been recorded (e.g. preserved across a column deletion),
+   * that value is returned; otherwise the highest id currently present in the
+   * schema is returned. Replaces InternalSchema#maxColumnId.
+   *
+   * <p>Returns -1 if no ids have been assigned anywhere in the schema.</p>
+   */
+  public int maxColumnId() {
+    if (explicitMaxColumnId >= 0) {
+      return explicitMaxColumnId;
+    }
+    return index().maxColumnIdSeen();
+  }
+
+  /**
+   * Records the highest column id explicitly. Useful after column deletions, 
where
+   * the highest currently-present id is less than the highest ever assigned 
and
+   * subsequent column additions must avoid colliding with previously-used ids.
+   */
+  public HoodieSchema setMaxColumnId(int maxColumnId) {
+    this.explicitMaxColumnId = maxColumnId;
+    return this;
+  }
+
+  /**
+   * Returns the dot-joined full name of the field that owns column id {@code 
id},
+   * or empty string if none. Replaces InternalSchema#findFullName.
+   */
+  public String findFullName(int id) {
+    String result = index().idToName().get(id);
+    return result == null ? "" : result;
+  }
+
+  /**
+   * Returns the column id assigned to the field at {@code fullName}, or -1 if 
not found.
+   * Replaces InternalSchema#findIdByName.
+   */
+  public int findIdByName(String fullName) {
+    if (fullName == null || fullName.isEmpty()) {
+      return -1;
+    }
+    Integer id = index().nameToId().get(fullName);
+    return id == null ? -1 : id;
+  }
+
+  /**
+   * Returns all column ids in this schema. Replaces InternalSchema#getAllIds.
+   */
+  public java.util.Set<Integer> getAllIds() {

Review Comment:
   πŸ€– nit: the new methods (`getAllIds`, `getAllColsFullName`, `idIndex`, etc.) 
use fully-qualified `java.util.Set` / `java.util.List` / `java.util.ArrayList` 
/ `java.util.Map` inline. Could you add the imports and drop the prefixes to 
match the rest of the file?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -791,20 +796,21 @@ object HoodieBaseRelation extends SparkAdapterSupport {
   }
 
   /**
-   * Projects provided schema by picking only required (projected) top-level 
columns from it
+   * Projects provided schema by picking only required (projected) top-level 
columns from it.
    *
-   * @param tableSchema schema to project (either of [[InternalSchema]] or 
Avro's [[Schema]])
+   * @param tableSchema     Right is the schema-on-read evolution schema (with 
field ids);
+   *                        Left is the structural HoodieSchema fallback.
    * @param requiredColumns required top-level columns to be projected
    */
-  def projectSchema(tableSchema: Either[HoodieSchema, InternalSchema], 
requiredColumns: Array[String]): (HoodieSchema, StructType, InternalSchema) = {
+  def projectSchema(tableSchema: Either[HoodieSchema, HoodieSchema], 
requiredColumns: Array[String]): (HoodieSchema, StructType, 
Option[HoodieSchema]) = {

Review Comment:
   πŸ€– nit: now that both arms of the `Either` are `HoodieSchema`, this signature 
is hard to read at the call site β€” `Either[HoodieSchema, HoodieSchema]` doesn't 
communicate which arm means what. Could you replace it with two separate 
parameters (e.g. `evolutionSchema: Option[HoodieSchema], fallbackSchema: 
HoodieSchema`) or a small sealed wrapper, so the intent doesn't depend on which 
side of the Either you happen to land on?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to