hudi-agent commented on code in PR #18680:
URL: https://github.com/apache/hudi/pull/18680#discussion_r3178795094


##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaInternalSchemaBridge.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * One-way bridge from {@link InternalSchema} to {@link HoodieSchema} that 
preserves
+ * column ids by stamping them as Avro custom properties on the HoodieSchema's
+ * underlying schema tree.
+ *
+ * <p>This exists during the InternalSchema β†’ HoodieSchema migration. The 
existing
+ * {@link InternalSchemaConverter#convert(InternalSchema, String)} produces a
+ * structurally-correct HoodieSchema but discards field ids. Downstream code in
+ * the new evolution layer relies on {@code field-id} / {@code element-id} /
+ * {@code key-id} / {@code value-id} properties being present, so we walk the
+ * InternalSchema and stamped HoodieSchema in lock-step and copy ids over.</p>
+ *
+ * <p>The walk order matches {@code 
InternalSchemaConverter.visitInternalSchemaToBuildHoodieSchema}
+ * (record fields in declared order; array element after array; map key + 
value after map),
+ * so positional pairing is exact.</p>
+ *
+ * <p>Public for the migration period only β€” Phase 4 callsite migrations across
+ * different packages need access to the conversion. Once Phase 5 rewrites the
+ * action algebra on pure HoodieSchema, this bridge and its dependency on
+ * {@code InternalSchema} go away.</p>
+ */
+public final class HoodieSchemaInternalSchemaBridge {
+
+  private HoodieSchemaInternalSchemaBridge() {
+  }
+
+  /**
+   * Converts a {@link HoodieSchema} to an {@link InternalSchema}, preserving 
column
+   * ids carried as {@code field-id} / {@code element-id} / {@code key-id} /
+   * {@code value-id} Avro custom properties. This is the inverse of
+   * {@link #toHoodieSchema(InternalSchema, String)} and exists so the faΓ§ade 
can
+   * round-trip a HoodieSchema through the legacy applier without renumbering 
ids on
+   * every call.
+   *
+   * <p>For HoodieSchemas that have not yet had ids assigned (e.g. freshly 
parsed
+   * input), this falls back to the existing
+   * {@link InternalSchemaConverter#convert(HoodieSchema)} which mints fresh 
ids.</p>
+   */
+  public static InternalSchema toInternalSchema(HoodieSchema hoodieSchema) {
+    // Take the structurally-correct InternalSchema produced by the existing 
converter,
+    // then walk both schemas in parallel and overwrite the InternalSchema's 
freshly-minted
+    // ids with the ids carried as Avro properties on the HoodieSchema (where 
present).
+    InternalSchema fresh = InternalSchemaConverter.convert(hoodieSchema, 
hoodieSchema.getNameToPosition());
+    Types.RecordType originalRecord = fresh.getRecord();
+    Types.RecordType reidentified = (Types.RecordType) 
reidentify(hoodieSchema, originalRecord);
+    InternalSchema result = (originalRecord == reidentified)
+        ? fresh
+        : new InternalSchema(reidentified);
+    long schemaId = hoodieSchema.schemaId();
+    if (schemaId >= 0) {
+      result.setSchemaId(schemaId);
+    }
+    int maxColumnId = hoodieSchema.maxColumnId();
+    if (maxColumnId >= 0) {
+      result.setMaxColumnId(maxColumnId);
+    }
+    return result;
+  }
+
+  /**
+   * Walks a HoodieSchema and the corresponding InternalSchema {@link Type} in 
parallel
+   * and produces a {@link Type} where each addressable id matches the 
HoodieSchema's
+   * Avro custom property (when present). Returns the original {@code 
internalType}
+   * unchanged when no overrides apply, so callers can short-circuit.
+   */
+  private static Type reidentify(HoodieSchema hoodieSchema, Type internalType) 
{
+    HoodieSchema effective = hoodieSchema.isNullable() ? 
hoodieSchema.getNonNullType() : hoodieSchema;
+    switch (internalType.typeId()) {
+      case RECORD: {
+        Types.RecordType record = (Types.RecordType) internalType;
+        if (effective.getType() != HoodieSchemaType.RECORD) {
+          return internalType;
+        }
+        List<Types.Field> originalFields = record.fields();
+        List<Types.Field> rebuilt = new ArrayList<>(originalFields.size());
+        boolean anyChange = false;
+        for (int i = 0; i < originalFields.size(); i++) {
+          Types.Field original = originalFields.get(i);
+          HoodieSchemaField hf = effective.getFields().get(i);
+          int overrideId = hf.fieldId();
+          Type childType = reidentify(hf.schema(), original.type());
+          int finalId = overrideId >= 0 ? overrideId : original.fieldId();
+          if (finalId == original.fieldId() && childType == original.type()) {
+            rebuilt.add(original);
+          } else {
+            rebuilt.add(Types.Field.get(finalId, original.isOptional(), 
original.name(), childType, original.doc()));
+            anyChange = true;
+          }
+        }
+        return anyChange ? Types.RecordType.get(rebuilt, record.name()) : 
record;
+      }
+      case ARRAY: {
+        Types.ArrayType array = (Types.ArrayType) internalType;
+        if (effective.getType() != HoodieSchemaType.ARRAY) {
+          return internalType;
+        }
+        int overrideElementId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP),
 -1);
+        Type newElement = reidentify(effective.getElementType(), 
array.elementType());
+        int finalElementId = overrideElementId >= 0 ? overrideElementId : 
array.elementId();
+        if (finalElementId == array.elementId() && newElement == 
array.elementType()) {
+          return array;
+        }
+        return Types.ArrayType.get(finalElementId, array.isElementOptional(), 
newElement);
+      }
+      case MAP: {
+        Types.MapType map = (Types.MapType) internalType;
+        if (effective.getType() != HoodieSchemaType.MAP) {
+          return internalType;
+        }
+        int overrideKeyId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP), 
-1);
+        int overrideValueId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP),
 -1);
+        Type newValue = reidentify(effective.getValueType(), map.valueType());
+        int finalKeyId = overrideKeyId >= 0 ? overrideKeyId : map.keyId();
+        int finalValueId = overrideValueId >= 0 ? overrideValueId : 
map.valueId();
+        if (finalKeyId == map.keyId() && finalValueId == map.valueId() && 
newValue == map.valueType()) {
+          return map;
+        }
+        return Types.MapType.get(finalKeyId, finalValueId, map.keyType(), 
newValue, map.isValueOptional());
+      }
+      default:
+        return internalType;
+    }
+  }
+
+  private static int readIntProp(Object raw, int fallback) {
+    return raw instanceof Number ? ((Number) raw).intValue() : fallback;
+  }
+
+  /**
+   * Converts an {@link InternalSchema} to a {@link HoodieSchema} and stamps 
every
+   * sub-schema with the corresponding field id from the source. The 
schema-level
+   * version id and max column id are also propagated.
+   */
+  public static HoodieSchema toHoodieSchema(InternalSchema internalSchema, 
String recordName) {
+    HoodieSchema hoodieSchema = 
InternalSchemaConverter.convert(internalSchema, recordName);
+    stampIds(hoodieSchema, internalSchema.getRecord());
+    hoodieSchema.setSchemaId(internalSchema.schemaId());
+    hoodieSchema.setMaxColumnId(internalSchema.getMaxColumnId());
+    hoodieSchema.invalidateIdIndex();
+    return hoodieSchema;
+  }
+
+  private static void stampIds(HoodieSchema hoodieSchema, Type type) {
+    HoodieSchema effective = hoodieSchema.isNullable() ? 
hoodieSchema.getNonNullType() : hoodieSchema;
+    switch (type.typeId()) {
+      case RECORD: {
+        Types.RecordType record = (Types.RecordType) type;
+        // The HoodieSchema produced by InternalSchemaConverter preserves the 
declared
+        // field order, so positional pairing with InternalSchema is exact.
+        if (effective.getType() != HoodieSchemaType.RECORD) {
+          return;
+        }
+        for (int i = 0; i < record.fields().size(); i++) {
+          Types.Field internalField = record.fields().get(i);
+          HoodieSchemaField hoodieField = effective.getFields().get(i);
+          hoodieField.getAvroField().addProp(HoodieSchema.FIELD_ID_PROP, 
internalField.fieldId());
+          stampIds(hoodieField.schema(), internalField.type());
+        }
+        return;
+      }
+      case ARRAY: {
+        Types.ArrayType array = (Types.ArrayType) type;
+        if (effective.getType() != HoodieSchemaType.ARRAY) {
+          return;
+        }
+        effective.getAvroSchema().addProp(HoodieSchema.ELEMENT_ID_PROP, 
array.elementId());
+        stampIds(effective.getElementType(), array.elementType());

Review Comment:
   πŸ€– `InternalSchemaConverter.convert` caches by `Types.RecordType` and returns 
the same `HoodieSchema` instance for equal record types. For an InternalSchema 
where the same record sub-type is referenced from multiple paths, two 
`stampIds` walks would visit the same `Schema.Field` and call 
`addProp(FIELD_ID_PROP, ...)` twice β€” and Avro's JsonProperties is set-once 
(the comment on lines 327-329 of HoodieSchema.java itself notes this), so the 
second `addProp` throws `AvroRuntimeException("Can't overwrite property: 
field-id")`. Could you guard with a getObjectProp check (mirroring the pattern 
in `HoodieSchemaIdAssigner.assignFieldId`) before each `addProp` call here, or 
otherwise short-circuit when ids are already stamped?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1315,6 +1333,127 @@ public Object getProp(String key) {
   public void addProp(String key, Object value) {
     ValidationUtils.checkArgument(key != null && !key.isEmpty(), "Property key 
cannot be null or empty");
     avroSchema.addProp(key, value);
+    this.idIndex = null;
+  }
+
+  /**
+   * Factory for an "empty" schema sentinel: an unnamed record with no fields 
and an
+   * unset schemaId. Equivalent to {@code 
InternalSchema#getEmptyInternalSchema()} β€”
+   * used by callers that need a placeholder when no evolution schema is 
available
+   * (e.g. schema-on-read disabled or no schema in commit metadata). Pair with
+   * {@link #isEmptySchema()} to detect the sentinel.
+   */
+  public static HoodieSchema empty() {
+    return createRecord("EmptySchema", null, "hudi", false, 
Collections.emptyList());
+  }
+
+  /**
+   * Returns true if this schema is the "empty" sentinel β€” i.e. has no schema 
id
+   * assigned. Replaces {@code InternalSchema#isEmptySchema()}.
+   */
+  public boolean isEmptySchema() {

Review Comment:
   πŸ€– The default `schemaId = -1L` means any HoodieSchema built via 
`parse()`/`fromAvroSchema()`/`createRecord()` without a subsequent 
`setSchemaId()` will report as "empty" here. The legacy 
`InternalSchema.isEmptySchema()` returns true only for the EMPTY_SCHEMA 
singleton (`versionId = -1`); regular schemas use `DEFAULT_VERSION_ID = 0` and 
are non-empty. When the existing `internalSchema.isEmptySchema()` call sites 
(FileGroupReaderSchemaHandler, HoodieLogFileReader, 
AbstractHoodieLogRecordScanner, HoodieAvroDataBlock, FileGroupRecordBuffer, 
etc.) get migrated, this semantics shift could change behavior. Could you align 
the default (e.g. `schemaId = 0`) or detect emptiness via 
`getFields().isEmpty()` like the singleton's invariant?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaSerDe.java:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+/**
+ * HoodieSchema-shaped faΓ§ade for evolution-schema JSON serialization.
+ *
+ * <p>The on-disk format is fixed: the {@code latest_schema} blob in commit 
metadata
+ * and the {@code .hoodie/.schema/} history files use the same JSON layout that
+ * {@link SerDeHelper} has always produced ({@code schemas} array containing
+ * objects with {@code version_id}, {@code max_column_id}, {@code type}, 
{@code fields}, etc.).
+ * Old tables must remain readable, so this faΓ§ade delegates to {@link 
SerDeHelper}
+ * verbatim and converts at the HoodieSchema/InternalSchema boundary via
+ * {@link HoodieSchemaInternalSchemaBridge}, preserving field ids on the way 
out.</p>
+ *
+ * <p>Phase 5 of the InternalSchema removal will rewrite the JSON serializer in
+ * pure HoodieSchema terms behind this stable interface β€” but the byte-for-byte
+ * compatibility constraint stays in force.</p>
+ */
+public final class HoodieSchemaSerDe {
+
+  /**
+   * Commit-metadata key under which the latest schema's JSON is stored. 
Carried
+   * over verbatim from {@link SerDeHelper#LATEST_SCHEMA} so callers reading 
old
+   * commit metadata pick up the same blob.
+   */
+  public static final String LATEST_SCHEMA = SerDeHelper.LATEST_SCHEMA;
+
+  /**
+   * JSON object key that wraps the array of historical schemas. Same as
+   * {@link SerDeHelper#SCHEMAS}.
+   */
+  public static final String SCHEMAS = SerDeHelper.SCHEMAS;
+
+  private HoodieSchemaSerDe() {
+  }
+
+  /**
+   * Serializes a single schema to JSON. Output format matches the legacy
+   * {@link SerDeHelper#toJson(InternalSchema)} byte for byte.
+   */
+  public static String toJson(HoodieSchema schema) {
+    return 
SerDeHelper.toJson(HoodieSchemaInternalSchemaBridge.toInternalSchema(schema));
+  }
+
+  /**
+   * Serializes a history of schemas to JSON. Output format matches the legacy
+   * {@link SerDeHelper#toJson(List)} byte for byte.
+   */
+  public static String toJsonHistory(List<HoodieSchema> schemas) {
+    List<InternalSchema> converted = new ArrayList<>(schemas.size());
+    for (HoodieSchema s : schemas) {
+      converted.add(HoodieSchemaInternalSchemaBridge.toInternalSchema(s));
+    }
+    return SerDeHelper.toJson(converted);
+  }
+
+  /**
+   * Parses a single-schema JSON blob (typically the {@code latest_schema} 
value
+   * from commit metadata). Returns empty if the input is null/empty so callers
+   * can pass through optional commit metadata fields.
+   */
+  public static Option<HoodieSchema> fromJson(String json) {
+    Option<InternalSchema> internal = SerDeHelper.fromJson(json);
+    if (!internal.isPresent()) {
+      return Option.empty();
+    }
+    return 
Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(), 
defaultRecordName(internal.get())));
+  }
+
+  /**
+   * Parses the history-schemas JSON layout (array of versioned schemas) and
+   * returns them keyed by {@code version_id}. Field ids are preserved on each
+   * returned HoodieSchema so the resulting map is directly usable by the
+   * read/merge path.
+   */
+  public static TreeMap<Long, HoodieSchema> parseHistorySchemas(String json) {

Review Comment:
   πŸ€– nit: `parseHistorySchemas` is asymmetric with `toJsonHistory` β€” have you 
considered naming it `fromJsonHistory` to match the `toJson`/`fromJson` 
convention already established on this class?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala:
##########
@@ -209,7 +208,7 @@ class IncrementalRelationV1(val sqlContext: SQLContext,
       val validCommits = metaClient
         
.getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray()
         .map(e => 
fileNameGenerator.getFileName(e.asInstanceOf[HoodieInstant])).mkString(",")
-      
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
 SerDeHelper.toJson(internalSchema))
+      
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
 HoodieSchemaSerDe.toJson(internalSchema))

Review Comment:
   πŸ€– Same empty-schema regression as in `embedEvolutionSchema` β€” when 
`internalSchema` is `HoodieSchema.empty()` (line 133/143 above), legacy wrote 
`""` here but `HoodieSchemaSerDe.toJson(empty)` now writes a non-empty JSON 
because the bridge's `toInternalSchema` doesn't preserve `versionId=-1`. 
Downstream readers will then enter the schema-on-read path on tables that 
disable it. Should be fixed once the bridge / SerDe handles the empty case (see 
comment on `HoodieBaseRelation.scala`).
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -267,7 +267,7 @@ public HoodieCleanMetadata execute() {
     if (pendingCleanInstants.size() > 0) {
       // try to clean old history schema.
       try {
-        FileBasedInternalSchemaStorageManager fss = new 
FileBasedInternalSchemaStorageManager(table.getMetaClient());
+        HoodieSchemaHistoryStorageManager fss = new 
HoodieSchemaHistoryStorageManager(table.getMetaClient());

Review Comment:
   πŸ€– nit: `fss` was shorthand for `FileBasedInternalSchemaStorageManager` β€” now 
that the type is `HoodieSchemaHistoryStorageManager` the abbreviation is 
completely disconnected. Could you rename it to something like 
`historyStorageManager`?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1315,6 +1333,127 @@ public Object getProp(String key) {
   public void addProp(String key, Object value) {
     ValidationUtils.checkArgument(key != null && !key.isEmpty(), "Property key 
cannot be null or empty");
     avroSchema.addProp(key, value);
+    this.idIndex = null;
+  }
+
+  /**
+   * Factory for an "empty" schema sentinel: an unnamed record with no fields 
and an
+   * unset schemaId. Equivalent to {@code 
InternalSchema#getEmptyInternalSchema()} β€”
+   * used by callers that need a placeholder when no evolution schema is 
available
+   * (e.g. schema-on-read disabled or no schema in commit metadata). Pair with
+   * {@link #isEmptySchema()} to detect the sentinel.
+   */
+  public static HoodieSchema empty() {
+    return createRecord("EmptySchema", null, "hudi", false, 
Collections.emptyList());
+  }
+
+  /**
+   * Returns true if this schema is the "empty" sentinel β€” i.e. has no schema 
id
+   * assigned. Replaces {@code InternalSchema#isEmptySchema()}.
+   */
+  public boolean isEmptySchema() {
+    return schemaId < 0;
+  }
+
+  /**
+   * Returns the schema version id (replaces InternalSchema#schemaId).
+   * Returns -1 if no version has been assigned.
+   */
+  public long schemaId() {
+    return schemaId;
+  }
+
+  /**
+   * Sets the schema version id. Typically derived from a commit instant 
timestamp.
+   * Mutable: callers that need to re-stamp the version (e.g. after an 
evolution
+   * operation) can call this multiple times.
+   */
+  public HoodieSchema setSchemaId(long schemaId) {
+    this.schemaId = schemaId;
+    return this;
+  }
+
+  /**
+   * Returns the highest column id assigned to any sub-schema. If an explicit
+   * max-column-id has been recorded (e.g. preserved across a column deletion),
+   * that value is returned; otherwise the highest id currently present in the
+   * schema is returned. Replaces InternalSchema#maxColumnId.
+   *
+   * <p>Returns -1 if no ids have been assigned anywhere in the schema.</p>
+   */
+  public int maxColumnId() {
+    if (explicitMaxColumnId >= 0) {
+      return explicitMaxColumnId;
+    }
+    return index().maxColumnIdSeen();
+  }
+
+  /**
+   * Records the highest column id explicitly. Useful after column deletions, 
where
+   * the highest currently-present id is less than the highest ever assigned 
and
+   * subsequent column additions must avoid colliding with previously-used ids.
+   */
+  public HoodieSchema setMaxColumnId(int maxColumnId) {
+    this.explicitMaxColumnId = maxColumnId;
+    return this;
+  }
+
+  /**
+   * Returns the dot-joined full name of the field that owns column id {@code 
id},
+   * or empty string if none. Replaces InternalSchema#findFullName.
+   */
+  public String findFullName(int id) {
+    String result = index().idToName().get(id);
+    return result == null ? "" : result;
+  }
+
+  /**
+   * Returns the column id assigned to the field at {@code fullName}, or -1 if 
not found.
+   * Replaces InternalSchema#findIdByName.
+   */
+  public int findIdByName(String fullName) {
+    if (fullName == null || fullName.isEmpty()) {
+      return -1;
+    }
+    Integer id = index().nameToId().get(fullName);
+    return id == null ? -1 : id;
+  }
+
+  /**
+   * Returns all column ids in this schema. Replaces InternalSchema#getAllIds.
+   */
+  public java.util.Set<Integer> getAllIds() {
+    return index().idToName().keySet();
+  }
+
+  /**
+   * Returns a mapping from full field name to depth-first traversal position. 
Used during
+   * ingest reconciliation to preserve declared field order. Replaces
+   * InternalSchema#getNameToPosition.
+   */
+  public Map<String, Integer> getNameToPosition() {
+    return index().nameToPosition();
+  }
+
+  /**
+   * Returns the lazily-computed id ↔ name index. Recomputes after any 
mutation that
+   * goes through {@link #addProp}.
+   */
+  HoodieSchemaIndex index() {

Review Comment:
   πŸ€– Lazy init for `idIndex` is racy: it's a non-volatile field with no 
synchronization. If thread A is mid-computation in `index()` and thread B 
mutates props (which sets `this.idIndex = null` in `addProp`), thread A's later 
assignment can clobber the null with a stale index. Given 
`HoodieSchemaCache.intern()` shares schemas JVM-wide and the new methods 
(`findFullName`, `findIdByName`, `getNameToPosition`) are reachable from 
Spark/Flink task code, do you have a sense of whether this is exercised in 
practice, or is the assumption that a HoodieSchema is effectively frozen after 
construction?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaHistoryStorageManager.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.internal.schema.InternalSchema;
+import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+
+import java.util.List;
+
+/**
+ * HoodieSchema-shaped faΓ§ade around the {@code .hoodie/.schema/} 
schema-history
+ * storage layer. Delegates to {@link FileBasedInternalSchemaStorageManager} 
so the
+ * on-disk paths, file naming, and JSON layout are all preserved verbatim β€” a 
hard
+ * requirement for backward compatibility with tables created by prior Hudi 
versions.
+ *
+ * <p>String-based methods ({@link #persistHistorySchemaStr},
+ * {@link #getHistorySchemaStr}, {@link #cleanOldFiles}) pass through 
unchanged.
+ * Only {@link #getSchemaByKey(String)} crosses the schema-type boundary and 
uses
+ * {@link HoodieSchemaInternalSchemaBridge} to return a HoodieSchema with 
field ids
+ * preserved.</p>
+ */
+public class HoodieSchemaHistoryStorageManager {
+
+  /**
+   * Subdirectory name under {@code .hoodie/} that holds the schema-history 
files.
+   * Matches {@link FileBasedInternalSchemaStorageManager#SCHEMA_NAME} 
verbatim.
+   */
+  public static final String SCHEMA_NAME = 
FileBasedInternalSchemaStorageManager.SCHEMA_NAME;
+
+  private final FileBasedInternalSchemaStorageManager delegate;
+
+  public HoodieSchemaHistoryStorageManager(HoodieStorage storage, StoragePath 
baseTablePath) {
+    this.delegate = new FileBasedInternalSchemaStorageManager(storage, 
baseTablePath);
+  }
+
+  public HoodieSchemaHistoryStorageManager(HoodieTableMetaClient metaClient) {
+    this.delegate = new FileBasedInternalSchemaStorageManager(metaClient);
+  }
+
+  /**
+   * Persists a history-schema JSON blob keyed by the commit instant time. The
+   * input JSON is expected to be in the layout produced by
+   * {@link HoodieSchemaSerDe#toJsonHistory} or
+   * {@link HoodieSchemaSerDe#inheritHistory}.
+   */
+  public void persistHistorySchemaStr(String instantTime, String 
historySchemaStr) {
+    delegate.persistHistorySchemaStr(instantTime, historySchemaStr);
+  }
+
+  /**
+   * Removes schema-history files whose instant times are not in the 
valid-commits
+   * set. Safety mechanism for archival cleanup.
+   */
+  public void cleanOldFiles(List<String> validateCommits) {

Review Comment:
   πŸ€– nit: `validateCommits` reads as a verb phrase ("commits to validate") 
rather than a noun phrase β€” could you rename it to `validCommits` to match the 
adjective form used in the sibling method 
`getHistorySchemaStrByGivenValidCommits`?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaIdAssigner.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import org.apache.avro.Schema;
+
+import static org.apache.hudi.common.schema.HoodieSchema.ELEMENT_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.FIELD_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.KEY_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.VALUE_ID_PROP;
+
+/**
+ * Walks a {@link HoodieSchema} and assigns sequential integer ids to every 
addressable
+ * sub-schema (record fields, array elements, map keys and values), writing 
the result
+ * onto the underlying Avro {@link Schema}/{@link Schema.Field} as custom JSON 
properties.
+ *
+ * <p>The traversal order mirrors {@link 
org.apache.hudi.internal.schema.InternalSchemaBuilder}
+ * (record fields in declared order; array element id assigned before 
recursing into the
+ * element type; map key id then value id assigned before recursing into the 
value type)
+ * so id assignments are stable across the InternalSchema β†’ HoodieSchema 
migration and
+ * round-tripping a previously-IDed InternalSchema through HoodieSchema 
produces the same
+ * id mapping.</p>
+ *
+ * <p>Existing ids are preserved: if a node already has an id property, this 
assigner uses
+ * that id and bumps {@code nextId} past it, so reapplying the assigner to a 
partially
+ * IDed schema is idempotent. {@link #assign(HoodieSchema, int)} returns the 
new
+ * {@code maxColumnId} so callers can persist it on the schema root.</p>
+ */
+public final class HoodieSchemaIdAssigner {
+
+  private HoodieSchemaIdAssigner() {
+  }
+
+  /**
+   * Assigns ids onto {@code schema} starting from {@code startId} and writes 
the resulting
+   * {@code max-column-id} onto the schema root.
+   *
+   * @return the maximum column id present after assignment
+   */
+  public static int assign(HoodieSchema schema, int startId) {
+    int[] nextId = {startId};
+    visit(schema, nextId);
+    int maxId = nextId[0] - 1;
+    schema.setMaxColumnId(maxId);
+    return maxId;
+  }
+
+  /**
+   * Assigns ids onto {@code schema}, starting after the highest id already 
present

Review Comment:
   πŸ€– After `visit(schema, nextId)` mutates field-id / element-id / key-id / 
value-id props directly on the underlying Avro Schema/Field (via 
`avroField.addProp` and `container.addProp` β€” bypassing 
`HoodieSchema.addProp`), the cached `schema.idIndex` is not invalidated. If 
`schema.index()` (or any of the methods that go through it: `findFullName`, 
`findIdByName`, `getNameToPosition`, `getAllIds`, `maxColumnId`) was called 
before `assign(...)`, subsequent calls will return stale data. Could you call 
`schema.invalidateIdIndex()` at the end of `assign` (the docstring on 
`invalidateIdIndex` even references this assigner as the trigger)?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -684,6 +709,26 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     conf
   }
 
+  /**
+   * HoodieSchema-shaped twin of [[embedInternalSchema]]. Writes the 
schema-evolution
+   * config to the hadoop [[Configuration]] using the new 
[[HoodieSchemaSerDe]] for
+   * the JSON layer. Wire format is identical to the legacy method since
+   * [[HoodieSchemaSerDe]] delegates to the same on-disk SerDe.
+   */
+  protected def embedEvolutionSchema(conf: Configuration, evolutionSchemaOpt: 
Option[HoodieSchema]): Configuration = {
+    val schema = evolutionSchemaOpt.getOrElse(HoodieSchema.empty())
+    val querySchemaString = HoodieSchemaSerDe.toJson(schema)

Review Comment:
   πŸ€– I think this changes behavior vs. the legacy `embedInternalSchema` for the 
empty case. `HoodieSchemaInternalSchemaBridge.toInternalSchema` only calls 
`setSchemaId`/`setMaxColumnId` when the input id is `>= 0`, so an empty 
`HoodieSchema` (schemaId=-1) becomes an `InternalSchema` with 
`versionId=DEFAULT_VERSION_ID=0`, which is no longer `isEmptySchema()`. As a 
result `SerDeHelper.toJson(...)` emits a non-empty JSON for 
`HoodieSchema.empty()`, the `if (!isNullOrEmpty(...))` fires, and 
`HOODIE_QUERY_SCHEMA` ends up set on every query β€” including those where 
schema-on-read is disabled. Legacy returned `""` and skipped the conf. Could 
you make the bridge propagate the `-1` id (or short-circuit empty in 
`HoodieSchemaSerDe.toJson`)? @yihua
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java:
##########
@@ -125,9 +125,9 @@ protected Option<HoodieTableMetadataWriter> 
getMetadataWriter(
   }
 
   private static void setLatestInternalSchema(HoodieWriteConfig config, 
HoodieTableMetaClient metaClient) {
-    Option<InternalSchema> internalSchema = new 
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
-    if (internalSchema.isPresent()) {
-      config.setInternalSchemaString(SerDeHelper.toJson(internalSchema.get()));
+    Option<HoodieSchema> evolutionSchema = new 
TableSchemaResolver(metaClient).getTableEvolutionSchemaFromCommitMetadata();

Review Comment:
   πŸ€– nit: the enclosing method is still called `setLatestInternalSchema` but it 
now resolves and stores a `HoodieSchema` β€” could you rename it to 
`setLatestEvolutionSchema` to stay consistent with the renamed variables inside?
   
   <sub><i>- AI-generated; verify before applying. React πŸ‘/πŸ‘Ž to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to