voonhous commented on code in PR #18680:
URL: https://github.com/apache/hudi/pull/18680#discussion_r3182189099


##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1315,6 +1333,127 @@ public Object getProp(String key) {
   public void addProp(String key, Object value) {
     ValidationUtils.checkArgument(key != null && !key.isEmpty(), "Property key 
cannot be null or empty");
     avroSchema.addProp(key, value);
+    this.idIndex = null;
+  }
+
+  /**
+   * Factory for an "empty" schema sentinel: an unnamed record with no fields 
and an
+   * unset schemaId. Equivalent to {@code 
InternalSchema#getEmptyInternalSchema()} —
+   * used by callers that need a placeholder when no evolution schema is 
available
+   * (e.g. schema-on-read disabled or no schema in commit metadata). Pair with
+   * {@link #isEmptySchema()} to detect the sentinel.
+   */
+  public static HoodieSchema empty() {
+    return createRecord("EmptySchema", null, "hudi", false, 
Collections.emptyList());
+  }
+
+  /**
+   * Returns true if this schema is the "empty" sentinel — i.e. has no schema 
id
+   * assigned. Replaces {@code InternalSchema#isEmptySchema()}.
+   */
+  public boolean isEmptySchema() {

Review Comment:
   isEmptySchema now uses the no-fields check (type != RECORD || 
fields.isEmpty()) instead of schemaId<0. That matches the bridge's 
short-circuit and the legacy InternalSchema.EMPTY_SCHEMA singleton intent, so 
freshly-built HoodieSchemas with real fields no longer report as empty even 
without setSchemaId.



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaIdAssigner.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import org.apache.avro.Schema;
+
+import static org.apache.hudi.common.schema.HoodieSchema.ELEMENT_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.FIELD_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.KEY_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.VALUE_ID_PROP;
+
+/**
+ * Walks a {@link HoodieSchema} and assigns sequential integer ids to every 
addressable
+ * sub-schema (record fields, array elements, map keys and values), writing 
the result
+ * onto the underlying Avro {@link Schema}/{@link Schema.Field} as custom JSON 
properties.
+ *
+ * <p>The traversal order mirrors {@link 
org.apache.hudi.internal.schema.InternalSchemaBuilder}
+ * (record fields in declared order; array element id assigned before 
recursing into the
+ * element type; map key id then value id assigned before recursing into the 
value type)
+ * so id assignments are stable across the InternalSchema → HoodieSchema 
migration and
+ * round-tripping a previously-IDed InternalSchema through HoodieSchema 
produces the same
+ * id mapping.</p>
+ *
+ * <p>Existing ids are preserved: if a node already has an id property, this 
assigner uses
+ * that id and bumps {@code nextId} past it, so reapplying the assigner to a 
partially
+ * IDed schema is idempotent. {@link #assign(HoodieSchema, int)} returns the 
new
+ * {@code maxColumnId} so callers can persist it on the schema root.</p>
+ */
+public final class HoodieSchemaIdAssigner {
+
+  private HoodieSchemaIdAssigner() {
+  }
+
+  /**
+   * Assigns ids onto {@code schema} starting from {@code startId} and writes 
the resulting
+   * {@code max-column-id} onto the schema root.
+   *
+   * @return the maximum column id present after assignment
+   */
+  public static int assign(HoodieSchema schema, int startId) {
+    int[] nextId = {startId};
+    visit(schema, nextId);
+    int maxId = nextId[0] - 1;
+    schema.setMaxColumnId(maxId);
+    return maxId;
+  }
+
+  /**
+   * Assigns ids onto {@code schema}, starting after the highest id already 
present

Review Comment:
   Fixed by moving invalidateIdIndex() into HoodieSchemaIdAssigner.assign 
itself, so the cache invalidation always pairs with the direct Avro mutation. 
Removed the now-redundant manual calls in BaseHoodieWriteClient.



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaInternalSchemaBridge.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * One-way bridge from {@link InternalSchema} to {@link HoodieSchema} that 
preserves
+ * column ids by stamping them as Avro custom properties on the HoodieSchema's
+ * underlying schema tree.
+ *
+ * <p>This exists during the InternalSchema → HoodieSchema migration. The 
existing
+ * {@link InternalSchemaConverter#convert(InternalSchema, String)} produces a
+ * structurally-correct HoodieSchema but discards field ids. Downstream code in
+ * the new evolution layer relies on {@code field-id} / {@code element-id} /
+ * {@code key-id} / {@code value-id} properties being present, so we walk the
+ * InternalSchema and stamped HoodieSchema in lock-step and copy ids over.</p>
+ *
+ * <p>The walk order matches {@code 
InternalSchemaConverter.visitInternalSchemaToBuildHoodieSchema}
+ * (record fields in declared order; array element after array; map key + 
value after map),
+ * so positional pairing is exact.</p>
+ *
+ * <p>Public for the migration period only — Phase 4 callsite migrations across
+ * different packages need access to the conversion. Once Phase 5 rewrites the
+ * action algebra on pure HoodieSchema, this bridge and its dependency on
+ * {@code InternalSchema} go away.</p>
+ */
+public final class HoodieSchemaInternalSchemaBridge {
+
+  private HoodieSchemaInternalSchemaBridge() {
+  }
+
+  /**
+   * Converts a {@link HoodieSchema} to an {@link InternalSchema}, preserving 
column
+   * ids carried as {@code field-id} / {@code element-id} / {@code key-id} /
+   * {@code value-id} Avro custom properties. This is the inverse of
+   * {@link #toHoodieSchema(InternalSchema, String)} and exists so the façade 
can
+   * round-trip a HoodieSchema through the legacy applier without renumbering 
ids on
+   * every call.
+   *
+   * <p>For HoodieSchemas that have not yet had ids assigned (e.g. freshly 
parsed
+   * input), this falls back to the existing
+   * {@link InternalSchemaConverter#convert(HoodieSchema)} which mints fresh 
ids.</p>
+   */
+  public static InternalSchema toInternalSchema(HoodieSchema hoodieSchema) {
+    // Take the structurally-correct InternalSchema produced by the existing 
converter,
+    // then walk both schemas in parallel and overwrite the InternalSchema's 
freshly-minted
+    // ids with the ids carried as Avro properties on the HoodieSchema (where 
present).
+    InternalSchema fresh = InternalSchemaConverter.convert(hoodieSchema, 
hoodieSchema.getNameToPosition());
+    Types.RecordType originalRecord = fresh.getRecord();
+    Types.RecordType reidentified = (Types.RecordType) 
reidentify(hoodieSchema, originalRecord);
+    InternalSchema result = (originalRecord == reidentified)
+        ? fresh
+        : new InternalSchema(reidentified);
+    long schemaId = hoodieSchema.schemaId();
+    if (schemaId >= 0) {
+      result.setSchemaId(schemaId);
+    }
+    int maxColumnId = hoodieSchema.maxColumnId();
+    if (maxColumnId >= 0) {
+      result.setMaxColumnId(maxColumnId);
+    }
+    return result;
+  }
+
+  /**
+   * Walks a HoodieSchema and the corresponding InternalSchema {@link Type} in 
parallel
+   * and produces a {@link Type} where each addressable id matches the 
HoodieSchema's
+   * Avro custom property (when present). Returns the original {@code 
internalType}
+   * unchanged when no overrides apply, so callers can short-circuit.
+   */
+  private static Type reidentify(HoodieSchema hoodieSchema, Type internalType) 
{
+    HoodieSchema effective = hoodieSchema.isNullable() ? 
hoodieSchema.getNonNullType() : hoodieSchema;
+    switch (internalType.typeId()) {
+      case RECORD: {
+        Types.RecordType record = (Types.RecordType) internalType;
+        if (effective.getType() != HoodieSchemaType.RECORD) {
+          return internalType;
+        }
+        List<Types.Field> originalFields = record.fields();
+        List<Types.Field> rebuilt = new ArrayList<>(originalFields.size());
+        boolean anyChange = false;
+        for (int i = 0; i < originalFields.size(); i++) {
+          Types.Field original = originalFields.get(i);
+          HoodieSchemaField hf = effective.getFields().get(i);
+          int overrideId = hf.fieldId();
+          Type childType = reidentify(hf.schema(), original.type());
+          int finalId = overrideId >= 0 ? overrideId : original.fieldId();
+          if (finalId == original.fieldId() && childType == original.type()) {
+            rebuilt.add(original);
+          } else {
+            rebuilt.add(Types.Field.get(finalId, original.isOptional(), 
original.name(), childType, original.doc()));
+            anyChange = true;
+          }
+        }
+        return anyChange ? Types.RecordType.get(rebuilt, record.name()) : 
record;
+      }
+      case ARRAY: {
+        Types.ArrayType array = (Types.ArrayType) internalType;
+        if (effective.getType() != HoodieSchemaType.ARRAY) {
+          return internalType;
+        }
+        int overrideElementId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP),
 -1);
+        Type newElement = reidentify(effective.getElementType(), 
array.elementType());
+        int finalElementId = overrideElementId >= 0 ? overrideElementId : 
array.elementId();
+        if (finalElementId == array.elementId() && newElement == 
array.elementType()) {
+          return array;
+        }
+        return Types.ArrayType.get(finalElementId, array.isElementOptional(), 
newElement);
+      }
+      case MAP: {
+        Types.MapType map = (Types.MapType) internalType;
+        if (effective.getType() != HoodieSchemaType.MAP) {
+          return internalType;
+        }
+        int overrideKeyId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP), 
-1);
+        int overrideValueId = 
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP),
 -1);
+        Type newValue = reidentify(effective.getValueType(), map.valueType());
+        int finalKeyId = overrideKeyId >= 0 ? overrideKeyId : map.keyId();
+        int finalValueId = overrideValueId >= 0 ? overrideValueId : 
map.valueId();
+        if (finalKeyId == map.keyId() && finalValueId == map.valueId() && 
newValue == map.valueType()) {
+          return map;
+        }
+        return Types.MapType.get(finalKeyId, finalValueId, map.keyType(), 
newValue, map.isValueOptional());
+      }
+      default:
+        return internalType;
+    }
+  }
+
+  private static int readIntProp(Object raw, int fallback) {
+    return raw instanceof Number ? ((Number) raw).intValue() : fallback;
+  }
+
+  /**
+   * Converts an {@link InternalSchema} to a {@link HoodieSchema} and stamps 
every
+   * sub-schema with the corresponding field id from the source. The 
schema-level
+   * version id and max column id are also propagated.
+   */
+  public static HoodieSchema toHoodieSchema(InternalSchema internalSchema, 
String recordName) {
+    HoodieSchema hoodieSchema = 
InternalSchemaConverter.convert(internalSchema, recordName);
+    stampIds(hoodieSchema, internalSchema.getRecord());
+    hoodieSchema.setSchemaId(internalSchema.schemaId());
+    hoodieSchema.setMaxColumnId(internalSchema.getMaxColumnId());
+    hoodieSchema.invalidateIdIndex();
+    return hoodieSchema;
+  }
+
+  private static void stampIds(HoodieSchema hoodieSchema, Type type) {
+    HoodieSchema effective = hoodieSchema.isNullable() ? 
hoodieSchema.getNonNullType() : hoodieSchema;
+    switch (type.typeId()) {
+      case RECORD: {
+        Types.RecordType record = (Types.RecordType) type;
+        // The HoodieSchema produced by InternalSchemaConverter preserves the 
declared
+        // field order, so positional pairing with InternalSchema is exact.
+        if (effective.getType() != HoodieSchemaType.RECORD) {
+          return;
+        }
+        for (int i = 0; i < record.fields().size(); i++) {
+          Types.Field internalField = record.fields().get(i);
+          HoodieSchemaField hoodieField = effective.getFields().get(i);
+          hoodieField.getAvroField().addProp(HoodieSchema.FIELD_ID_PROP, 
internalField.fieldId());
+          stampIds(hoodieField.schema(), internalField.type());
+        }
+        return;
+      }
+      case ARRAY: {
+        Types.ArrayType array = (Types.ArrayType) type;
+        if (effective.getType() != HoodieSchemaType.ARRAY) {
+          return;
+        }
+        effective.getAvroSchema().addProp(HoodieSchema.ELEMENT_ID_PROP, 
array.elementId());
+        stampIds(effective.getElementType(), array.elementType());

Review Comment:
   Good catch. stampIds is now idempotent via stampPropIfAbsent: no-ops on a 
matching existing value, throws on a divergence (would indicate upstream 
corruption since field-id is identity).



##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1315,6 +1333,127 @@ public Object getProp(String key) {
   public void addProp(String key, Object value) {
     ValidationUtils.checkArgument(key != null && !key.isEmpty(), "Property key 
cannot be null or empty");
     avroSchema.addProp(key, value);
+    this.idIndex = null;
+  }
+
+  /**
+   * Factory for an "empty" schema sentinel: an unnamed record with no fields 
and an
+   * unset schemaId. Equivalent to {@code 
InternalSchema#getEmptyInternalSchema()} —
+   * used by callers that need a placeholder when no evolution schema is 
available
+   * (e.g. schema-on-read disabled or no schema in commit metadata). Pair with
+   * {@link #isEmptySchema()} to detect the sentinel.
+   */
+  public static HoodieSchema empty() {
+    return createRecord("EmptySchema", null, "hudi", false, 
Collections.emptyList());
+  }
+
+  /**
+   * Returns true if this schema is the "empty" sentinel — i.e. has no schema 
id
+   * assigned. Replaces {@code InternalSchema#isEmptySchema()}.
+   */
+  public boolean isEmptySchema() {
+    return schemaId < 0;
+  }
+
+  /**
+   * Returns the schema version id (replaces InternalSchema#schemaId).
+   * Returns -1 if no version has been assigned.
+   */
+  public long schemaId() {
+    return schemaId;
+  }
+
+  /**
+   * Sets the schema version id. Typically derived from a commit instant 
timestamp.
+   * Mutable: callers that need to re-stamp the version (e.g. after an 
evolution
+   * operation) can call this multiple times.
+   */
+  public HoodieSchema setSchemaId(long schemaId) {
+    this.schemaId = schemaId;
+    return this;
+  }
+
+  /**
+   * Returns the highest column id assigned to any sub-schema. If an explicit
+   * max-column-id has been recorded (e.g. preserved across a column deletion),
+   * that value is returned; otherwise the highest id currently present in the
+   * schema is returned. Replaces InternalSchema#maxColumnId.
+   *
+   * <p>Returns -1 if no ids have been assigned anywhere in the schema.</p>
+   */
+  public int maxColumnId() {
+    if (explicitMaxColumnId >= 0) {
+      return explicitMaxColumnId;
+    }
+    return index().maxColumnIdSeen();
+  }
+
+  /**
+   * Records the highest column id explicitly. Useful after column deletions, 
where
+   * the highest currently-present id is less than the highest ever assigned 
and
+   * subsequent column additions must avoid colliding with previously-used ids.
+   */
+  public HoodieSchema setMaxColumnId(int maxColumnId) {
+    this.explicitMaxColumnId = maxColumnId;
+    return this;
+  }
+
+  /**
+   * Returns the dot-joined full name of the field that owns column id {@code 
id},
+   * or empty string if none. Replaces InternalSchema#findFullName.
+   */
+  public String findFullName(int id) {
+    String result = index().idToName().get(id);
+    return result == null ? "" : result;
+  }
+
+  /**
+   * Returns the column id assigned to the field at {@code fullName}, or -1 if 
not found.
+   * Replaces InternalSchema#findIdByName.
+   */
+  public int findIdByName(String fullName) {
+    if (fullName == null || fullName.isEmpty()) {
+      return -1;
+    }
+    Integer id = index().nameToId().get(fullName);
+    return id == null ? -1 : id;
+  }
+
+  /**
+   * Returns all column ids in this schema. Replaces InternalSchema#getAllIds.
+   */
+  public java.util.Set<Integer> getAllIds() {
+    return index().idToName().keySet();
+  }
+
+  /**
+   * Returns a mapping from full field name to depth-first traversal position. 
Used during
+   * ingest reconciliation to preserve declared field order. Replaces
+   * InternalSchema#getNameToPosition.
+   */
+  public Map<String, Integer> getNameToPosition() {
+    return index().nameToPosition();
+  }
+
+  /**
+   * Returns the lazily-computed id ↔ name index. Recomputes after any 
mutation that
+   * goes through {@link #addProp}.
+   */
+  HoodieSchemaIndex index() {

Review Comment:
   Made idIndex volatile so threads observing an interned HoodieSchema get a 
fully-constructed index. The mutation-after-sharing race is unlikely in 
practice (props are typically set during construction), so volatile alone is 
sufficient here.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -684,6 +709,26 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
     conf
   }
 
+  /**
+   * HoodieSchema-shaped twin of [[embedInternalSchema]]. Writes the 
schema-evolution
+   * config to the hadoop [[Configuration]] using the new 
[[HoodieSchemaSerDe]] for
+   * the JSON layer. Wire format is identical to the legacy method since
+   * [[HoodieSchemaSerDe]] delegates to the same on-disk SerDe.
+   */
+  protected def embedEvolutionSchema(conf: Configuration, evolutionSchemaOpt: 
Option[HoodieSchema]): Configuration = {
+    val schema = evolutionSchemaOpt.getOrElse(HoodieSchema.empty())
+    val querySchemaString = HoodieSchemaSerDe.toJson(schema)

Review Comment:
   Verified this is a non-issue. bridge.toInternalSchema short-circuits 
HoodieSchema.empty() based on getFields().isEmpty(), not schemaId, returning 
InternalSchema.getEmptyInternalSchema() (versionId=-1). SerDeHelper.toJson then 
returns "" as before, the if (!isNullOrEmpty(...)) does not fire, and 
HOODIE_QUERY_SCHEMA stays unset.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala:
##########
@@ -209,7 +208,7 @@ class IncrementalRelationV1(val sqlContext: SQLContext,
       val validCommits = metaClient
         
.getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray()
         .map(e => 
fileNameGenerator.getFileName(e.asInstanceOf[HoodieInstant])).mkString(",")
-      
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
 SerDeHelper.toJson(internalSchema))
+      
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
 HoodieSchemaSerDe.toJson(internalSchema))

Review Comment:
   Same conclusion as the HoodieBaseRelation thread: the bridge short-circuits 
HoodieSchema.empty() on no-fields, so HoodieSchemaSerDe.toJson(empty) emits "" 
and downstream takes the schema-on-read-disabled path. No regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to