voonhous commented on code in PR #18680:
URL: https://github.com/apache/hudi/pull/18680#discussion_r3182189099
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1315,6 +1333,127 @@ public Object getProp(String key) {
public void addProp(String key, Object value) {
ValidationUtils.checkArgument(key != null && !key.isEmpty(), "Property key
cannot be null or empty");
avroSchema.addProp(key, value);
+ this.idIndex = null;
+ }
+
+ /**
+ * Factory for an "empty" schema sentinel: an unnamed record with no fields
and an
+ * unset schemaId. Equivalent to {@code
InternalSchema#getEmptyInternalSchema()} —
+ * used by callers that need a placeholder when no evolution schema is
available
+ * (e.g. schema-on-read disabled or no schema in commit metadata). Pair with
+ * {@link #isEmptySchema()} to detect the sentinel.
+ */
+ public static HoodieSchema empty() {
+ return createRecord("EmptySchema", null, "hudi", false,
Collections.emptyList());
+ }
+
+ /**
+ * Returns true if this schema is the "empty" sentinel — i.e. has no schema
id
+ * assigned. Replaces {@code InternalSchema#isEmptySchema()}.
+ */
+ public boolean isEmptySchema() {
Review Comment:
isEmptySchema now uses the no-fields check (type != RECORD ||
fields.isEmpty()) instead of schemaId<0. That matches the bridge's
short-circuit and the legacy InternalSchema.EMPTY_SCHEMA singleton intent, so
freshly-built HoodieSchemas with real fields no longer report as empty even
without setSchemaId.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaIdAssigner.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema;
+
+import org.apache.avro.Schema;
+
+import static org.apache.hudi.common.schema.HoodieSchema.ELEMENT_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.FIELD_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.KEY_ID_PROP;
+import static org.apache.hudi.common.schema.HoodieSchema.VALUE_ID_PROP;
+
+/**
+ * Walks a {@link HoodieSchema} and assigns sequential integer ids to every
addressable
+ * sub-schema (record fields, array elements, map keys and values), writing
the result
+ * onto the underlying Avro {@link Schema}/{@link Schema.Field} as custom JSON
properties.
+ *
+ * <p>The traversal order mirrors {@link
org.apache.hudi.internal.schema.InternalSchemaBuilder}
+ * (record fields in declared order; array element id assigned before
recursing into the
+ * element type; map key id then value id assigned before recursing into the
value type)
+ * so id assignments are stable across the InternalSchema → HoodieSchema
migration and
+ * round-tripping a previously-IDed InternalSchema through HoodieSchema
produces the same
+ * id mapping.</p>
+ *
+ * <p>Existing ids are preserved: if a node already has an id property, this
assigner uses
+ * that id and bumps {@code nextId} past it, so reapplying the assigner to a
partially
+ * IDed schema is idempotent. {@link #assign(HoodieSchema, int)} returns the
new
+ * {@code maxColumnId} so callers can persist it on the schema root.</p>
+ */
+public final class HoodieSchemaIdAssigner {
+
+ private HoodieSchemaIdAssigner() {
+ }
+
+ /**
+ * Assigns ids onto {@code schema} starting from {@code startId} and writes
the resulting
+ * {@code max-column-id} onto the schema root.
+ *
+ * @return the maximum column id present after assignment
+ */
+ public static int assign(HoodieSchema schema, int startId) {
+ int[] nextId = {startId};
+ visit(schema, nextId);
+ int maxId = nextId[0] - 1;
+ schema.setMaxColumnId(maxId);
+ return maxId;
+ }
+
+ /**
+ * Assigns ids onto {@code schema}, starting after the highest id already
present
Review Comment:
Fixed by moving invalidateIdIndex() into HoodieSchemaIdAssigner.assign
itself, so the cache invalidation always pairs with the direct Avro mutation.
Removed the now-redundant manual calls in BaseHoodieWriteClient.
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaInternalSchemaBridge.java:
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.schema.evolution;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Type;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * One-way bridge from {@link InternalSchema} to {@link HoodieSchema} that
preserves
+ * column ids by stamping them as Avro custom properties on the HoodieSchema's
+ * underlying schema tree.
+ *
+ * <p>This exists during the InternalSchema → HoodieSchema migration. The
existing
+ * {@link InternalSchemaConverter#convert(InternalSchema, String)} produces a
+ * structurally-correct HoodieSchema but discards field ids. Downstream code in
+ * the new evolution layer relies on {@code field-id} / {@code element-id} /
+ * {@code key-id} / {@code value-id} properties being present, so we walk the
+ * InternalSchema and stamped HoodieSchema in lock-step and copy ids over.</p>
+ *
+ * <p>The walk order matches {@code
InternalSchemaConverter.visitInternalSchemaToBuildHoodieSchema}
+ * (record fields in declared order; array element after array; map key +
value after map),
+ * so positional pairing is exact.</p>
+ *
+ * <p>Public for the migration period only — Phase 4 callsite migrations across
+ * different packages need access to the conversion. Once Phase 5 rewrites the
+ * action algebra on pure HoodieSchema, this bridge and its dependency on
+ * {@code InternalSchema} go away.</p>
+ */
+public final class HoodieSchemaInternalSchemaBridge {
+
+ private HoodieSchemaInternalSchemaBridge() {
+ }
+
+ /**
+ * Converts a {@link HoodieSchema} to an {@link InternalSchema}, preserving
column
+ * ids carried as {@code field-id} / {@code element-id} / {@code key-id} /
+ * {@code value-id} Avro custom properties. This is the inverse of
+ * {@link #toHoodieSchema(InternalSchema, String)} and exists so the façade
can
+ * round-trip a HoodieSchema through the legacy applier without renumbering
ids on
+ * every call.
+ *
+ * <p>For HoodieSchemas that have not yet had ids assigned (e.g. freshly
parsed
+ * input), this falls back to the existing
+ * {@link InternalSchemaConverter#convert(HoodieSchema)} which mints fresh
ids.</p>
+ */
+ public static InternalSchema toInternalSchema(HoodieSchema hoodieSchema) {
+ // Take the structurally-correct InternalSchema produced by the existing
converter,
+ // then walk both schemas in parallel and overwrite the InternalSchema's
freshly-minted
+ // ids with the ids carried as Avro properties on the HoodieSchema (where
present).
+ InternalSchema fresh = InternalSchemaConverter.convert(hoodieSchema,
hoodieSchema.getNameToPosition());
+ Types.RecordType originalRecord = fresh.getRecord();
+ Types.RecordType reidentified = (Types.RecordType)
reidentify(hoodieSchema, originalRecord);
+ InternalSchema result = (originalRecord == reidentified)
+ ? fresh
+ : new InternalSchema(reidentified);
+ long schemaId = hoodieSchema.schemaId();
+ if (schemaId >= 0) {
+ result.setSchemaId(schemaId);
+ }
+ int maxColumnId = hoodieSchema.maxColumnId();
+ if (maxColumnId >= 0) {
+ result.setMaxColumnId(maxColumnId);
+ }
+ return result;
+ }
+
+ /**
+ * Walks a HoodieSchema and the corresponding InternalSchema {@link Type} in
parallel
+ * and produces a {@link Type} where each addressable id matches the
HoodieSchema's
+ * Avro custom property (when present). Returns the original {@code
internalType}
+ * unchanged when no overrides apply, so callers can short-circuit.
+ */
+ private static Type reidentify(HoodieSchema hoodieSchema, Type internalType)
{
+ HoodieSchema effective = hoodieSchema.isNullable() ?
hoodieSchema.getNonNullType() : hoodieSchema;
+ switch (internalType.typeId()) {
+ case RECORD: {
+ Types.RecordType record = (Types.RecordType) internalType;
+ if (effective.getType() != HoodieSchemaType.RECORD) {
+ return internalType;
+ }
+ List<Types.Field> originalFields = record.fields();
+ List<Types.Field> rebuilt = new ArrayList<>(originalFields.size());
+ boolean anyChange = false;
+ for (int i = 0; i < originalFields.size(); i++) {
+ Types.Field original = originalFields.get(i);
+ HoodieSchemaField hf = effective.getFields().get(i);
+ int overrideId = hf.fieldId();
+ Type childType = reidentify(hf.schema(), original.type());
+ int finalId = overrideId >= 0 ? overrideId : original.fieldId();
+ if (finalId == original.fieldId() && childType == original.type()) {
+ rebuilt.add(original);
+ } else {
+ rebuilt.add(Types.Field.get(finalId, original.isOptional(),
original.name(), childType, original.doc()));
+ anyChange = true;
+ }
+ }
+ return anyChange ? Types.RecordType.get(rebuilt, record.name()) :
record;
+ }
+ case ARRAY: {
+ Types.ArrayType array = (Types.ArrayType) internalType;
+ if (effective.getType() != HoodieSchemaType.ARRAY) {
+ return internalType;
+ }
+ int overrideElementId =
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP),
-1);
+ Type newElement = reidentify(effective.getElementType(),
array.elementType());
+ int finalElementId = overrideElementId >= 0 ? overrideElementId :
array.elementId();
+ if (finalElementId == array.elementId() && newElement ==
array.elementType()) {
+ return array;
+ }
+ return Types.ArrayType.get(finalElementId, array.isElementOptional(),
newElement);
+ }
+ case MAP: {
+ Types.MapType map = (Types.MapType) internalType;
+ if (effective.getType() != HoodieSchemaType.MAP) {
+ return internalType;
+ }
+ int overrideKeyId =
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP),
-1);
+ int overrideValueId =
readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP),
-1);
+ Type newValue = reidentify(effective.getValueType(), map.valueType());
+ int finalKeyId = overrideKeyId >= 0 ? overrideKeyId : map.keyId();
+ int finalValueId = overrideValueId >= 0 ? overrideValueId :
map.valueId();
+ if (finalKeyId == map.keyId() && finalValueId == map.valueId() &&
newValue == map.valueType()) {
+ return map;
+ }
+ return Types.MapType.get(finalKeyId, finalValueId, map.keyType(),
newValue, map.isValueOptional());
+ }
+ default:
+ return internalType;
+ }
+ }
+
+ private static int readIntProp(Object raw, int fallback) {
+ return raw instanceof Number ? ((Number) raw).intValue() : fallback;
+ }
+
+ /**
+ * Converts an {@link InternalSchema} to a {@link HoodieSchema} and stamps
every
+ * sub-schema with the corresponding field id from the source. The
schema-level
+ * version id and max column id are also propagated.
+ */
+ public static HoodieSchema toHoodieSchema(InternalSchema internalSchema,
String recordName) {
+ HoodieSchema hoodieSchema =
InternalSchemaConverter.convert(internalSchema, recordName);
+ stampIds(hoodieSchema, internalSchema.getRecord());
+ hoodieSchema.setSchemaId(internalSchema.schemaId());
+ hoodieSchema.setMaxColumnId(internalSchema.getMaxColumnId());
+ hoodieSchema.invalidateIdIndex();
+ return hoodieSchema;
+ }
+
+ private static void stampIds(HoodieSchema hoodieSchema, Type type) {
+ HoodieSchema effective = hoodieSchema.isNullable() ?
hoodieSchema.getNonNullType() : hoodieSchema;
+ switch (type.typeId()) {
+ case RECORD: {
+ Types.RecordType record = (Types.RecordType) type;
+ // The HoodieSchema produced by InternalSchemaConverter preserves the
declared
+ // field order, so positional pairing with InternalSchema is exact.
+ if (effective.getType() != HoodieSchemaType.RECORD) {
+ return;
+ }
+ for (int i = 0; i < record.fields().size(); i++) {
+ Types.Field internalField = record.fields().get(i);
+ HoodieSchemaField hoodieField = effective.getFields().get(i);
+ hoodieField.getAvroField().addProp(HoodieSchema.FIELD_ID_PROP,
internalField.fieldId());
+ stampIds(hoodieField.schema(), internalField.type());
+ }
+ return;
+ }
+ case ARRAY: {
+ Types.ArrayType array = (Types.ArrayType) type;
+ if (effective.getType() != HoodieSchemaType.ARRAY) {
+ return;
+ }
+ effective.getAvroSchema().addProp(HoodieSchema.ELEMENT_ID_PROP,
array.elementId());
+ stampIds(effective.getElementType(), array.elementType());
Review Comment:
Good catch. stampIds is now idempotent via stampPropIfAbsent: no-ops on a
matching existing value, throws on a divergence (would indicate upstream
corruption since field-id is identity).
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -1315,6 +1333,127 @@ public Object getProp(String key) {
public void addProp(String key, Object value) {
ValidationUtils.checkArgument(key != null && !key.isEmpty(), "Property key
cannot be null or empty");
avroSchema.addProp(key, value);
+ this.idIndex = null;
+ }
+
+ /**
+ * Factory for an "empty" schema sentinel: an unnamed record with no fields
and an
+ * unset schemaId. Equivalent to {@code
InternalSchema#getEmptyInternalSchema()} —
+ * used by callers that need a placeholder when no evolution schema is
available
+ * (e.g. schema-on-read disabled or no schema in commit metadata). Pair with
+ * {@link #isEmptySchema()} to detect the sentinel.
+ */
+ public static HoodieSchema empty() {
+ return createRecord("EmptySchema", null, "hudi", false,
Collections.emptyList());
+ }
+
+ /**
+ * Returns true if this schema is the "empty" sentinel — i.e. has no schema
id
+ * assigned. Replaces {@code InternalSchema#isEmptySchema()}.
+ */
+ public boolean isEmptySchema() {
+ return schemaId < 0;
+ }
+
+ /**
+ * Returns the schema version id (replaces InternalSchema#schemaId).
+ * Returns -1 if no version has been assigned.
+ */
+ public long schemaId() {
+ return schemaId;
+ }
+
+ /**
+ * Sets the schema version id. Typically derived from a commit instant
timestamp.
+ * Mutable: callers that need to re-stamp the version (e.g. after an
evolution
+ * operation) can call this multiple times.
+ */
+ public HoodieSchema setSchemaId(long schemaId) {
+ this.schemaId = schemaId;
+ return this;
+ }
+
+ /**
+ * Returns the highest column id assigned to any sub-schema. If an explicit
+ * max-column-id has been recorded (e.g. preserved across a column deletion),
+ * that value is returned; otherwise the highest id currently present in the
+ * schema is returned. Replaces InternalSchema#maxColumnId.
+ *
+ * <p>Returns -1 if no ids have been assigned anywhere in the schema.</p>
+ */
+ public int maxColumnId() {
+ if (explicitMaxColumnId >= 0) {
+ return explicitMaxColumnId;
+ }
+ return index().maxColumnIdSeen();
+ }
+
+ /**
+ * Records the highest column id explicitly. Useful after column deletions,
where
+ * the highest currently-present id is less than the highest ever assigned
and
+ * subsequent column additions must avoid colliding with previously-used ids.
+ */
+ public HoodieSchema setMaxColumnId(int maxColumnId) {
+ this.explicitMaxColumnId = maxColumnId;
+ return this;
+ }
+
+ /**
+ * Returns the dot-joined full name of the field that owns column id {@code
id},
+ * or empty string if none. Replaces InternalSchema#findFullName.
+ */
+ public String findFullName(int id) {
+ String result = index().idToName().get(id);
+ return result == null ? "" : result;
+ }
+
+ /**
+ * Returns the column id assigned to the field at {@code fullName}, or -1 if
not found.
+ * Replaces InternalSchema#findIdByName.
+ */
+ public int findIdByName(String fullName) {
+ if (fullName == null || fullName.isEmpty()) {
+ return -1;
+ }
+ Integer id = index().nameToId().get(fullName);
+ return id == null ? -1 : id;
+ }
+
+ /**
+ * Returns all column ids in this schema. Replaces InternalSchema#getAllIds.
+ */
+ public java.util.Set<Integer> getAllIds() {
+ return index().idToName().keySet();
+ }
+
+ /**
+ * Returns a mapping from full field name to depth-first traversal position.
Used during
+ * ingest reconciliation to preserve declared field order. Replaces
+ * InternalSchema#getNameToPosition.
+ */
+ public Map<String, Integer> getNameToPosition() {
+ return index().nameToPosition();
+ }
+
+ /**
+ * Returns the lazily-computed id ↔ name index. Recomputes after any
mutation that
+ * goes through {@link #addProp}.
+ */
+ HoodieSchemaIndex index() {
Review Comment:
Made idIndex volatile so threads observing an interned HoodieSchema get a
fully-constructed index. The mutation-after-sharing race is unlikely in
practice (props are typically set during construction), so volatile alone is
sufficient here.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala:
##########
@@ -684,6 +709,26 @@ abstract class HoodieBaseRelation(val sqlContext:
SQLContext,
conf
}
+ /**
+ * HoodieSchema-shaped twin of [[embedInternalSchema]]. Writes the
schema-evolution
+ * config to the hadoop [[Configuration]] using the new
[[HoodieSchemaSerDe]] for
+ * the JSON layer. Wire format is identical to the legacy method since
+ * [[HoodieSchemaSerDe]] delegates to the same on-disk SerDe.
+ */
+ protected def embedEvolutionSchema(conf: Configuration, evolutionSchemaOpt:
Option[HoodieSchema]): Configuration = {
+ val schema = evolutionSchemaOpt.getOrElse(HoodieSchema.empty())
+ val querySchemaString = HoodieSchemaSerDe.toJson(schema)
Review Comment:
Verified this is a non-issue. bridge.toInternalSchema short-circuits
HoodieSchema.empty() based on getFields().isEmpty(), not schemaId, returning
InternalSchema.getEmptyInternalSchema() (versionId=-1). SerDeHelper.toJson then
returns "" as before, the if (!isNullOrEmpty(...)) does not fire, and
HOODIE_QUERY_SCHEMA stays unset.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelationV1.scala:
##########
@@ -209,7 +208,7 @@ class IncrementalRelationV1(val sqlContext: SQLContext,
val validCommits = metaClient
.getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray()
.map(e =>
fileNameGenerator.getFileName(e.asInstanceOf[HoodieInstant])).mkString(",")
-
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
SerDeHelper.toJson(internalSchema))
+
sqlContext.sparkContext.hadoopConfiguration.set(SparkInternalSchemaConverter.HOODIE_QUERY_SCHEMA,
HoodieSchemaSerDe.toJson(internalSchema))
Review Comment:
Same conclusion as the HoodieBaseRelation thread: the bridge short-circuits
HoodieSchema.empty() on no-fields, so HoodieSchemaSerDe.toJson(empty) emits ""
and downstream takes the schema-on-read-disabled path. No regression.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]