hudi-agent commented on code in PR #18680: URL: https://github.com/apache/hudi/pull/18680#discussion_r3183787211
########## hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaIdAssigner.java: ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema; + +import org.apache.avro.Schema; + +import static org.apache.hudi.common.schema.HoodieSchema.ELEMENT_ID_PROP; +import static org.apache.hudi.common.schema.HoodieSchema.FIELD_ID_PROP; +import static org.apache.hudi.common.schema.HoodieSchema.KEY_ID_PROP; +import static org.apache.hudi.common.schema.HoodieSchema.VALUE_ID_PROP; + +/** + * Walks a {@link HoodieSchema} and assigns sequential integer ids to every addressable + * sub-schema (record fields, array elements, map keys and values), writing the result + * onto the underlying Avro {@link Schema}/{@link Schema.Field} as custom JSON properties. + * + * <p>The traversal order mirrors {@link org.apache.hudi.internal.schema.InternalSchemaBuilder} + * (record fields in declared order; array element id assigned before recursing into the + * element type; map key id then value id assigned before recursing into the value type) + * so id assignments are stable across the InternalSchema β HoodieSchema migration and + * round-tripping a previously-IDed InternalSchema through HoodieSchema produces the same + * id mapping.</p> + * + * <p>Existing ids are preserved: if a node already has an id property, this assigner uses + * that id and bumps {@code nextId} past it, so reapplying the assigner to a partially + * IDed schema is idempotent. {@link #assign(HoodieSchema, int)} returns the new + * {@code maxColumnId} so callers can persist it on the schema root.</p> + */ +public final class HoodieSchemaIdAssigner { + + private HoodieSchemaIdAssigner() { + } + + /** + * Assigns ids onto {@code schema} starting from {@code startId} and writes the resulting + * {@code max-column-id} onto the schema root. + * + * @return the maximum column id present after assignment + */ + public static int assign(HoodieSchema schema, int startId) { + int[] nextId = {startId}; + visit(schema, nextId); + int maxId = nextId[0] - 1; + schema.setMaxColumnId(maxId); + // Field/element/key/value ids were stamped onto the underlying Avro + // Schema/Field directly (bypassing HoodieSchema.addProp's invalidation), so + // any cached HoodieSchemaIndex on this schema is now stale. + schema.invalidateIdIndex(); + return maxId; + } + + /** + * Assigns ids onto {@code schema}, starting after the highest id already present + * (or 0 if none). This is the entry point for first-time id assignment on a schema + * that has no prior IDs. + */ + public static int assignFresh(HoodieSchema schema) { + HoodieSchemaIndex index = HoodieSchemaIndex.of(schema); + int start = Math.max(0, index.maxColumnIdSeen() + 1); + return assign(schema, start); + } + + private static void visit(HoodieSchema schema, int[] nextId) { + HoodieSchema effective = schema.isNullable() ? schema.getNonNullType() : schema; + switch (effective.getType()) { + case RECORD: + // Mirror InternalSchemaConverter ordering: assign ids to all fields at this level + // before recursing into any of them. + for (HoodieSchemaField field : effective.getFields()) { + assignFieldId(field, nextId); + } + for (HoodieSchemaField field : effective.getFields()) { + visit(field.schema(), nextId); + } + return; + case ARRAY: { + Schema arr = effective.getAvroSchema(); + assignContainerId(arr, ELEMENT_ID_PROP, nextId); + visit(effective.getElementType(), nextId); + return; + } + case MAP: { + Schema map = effective.getAvroSchema(); + assignContainerId(map, KEY_ID_PROP, nextId); + assignContainerId(map, VALUE_ID_PROP, nextId); + visit(effective.getValueType(), nextId); + return; + } + default: + // primitive: nothing to assign + } + } + + private static void assignFieldId(HoodieSchemaField field, int[] nextId) { + Schema.Field avroField = field.getAvroField(); + Object existing = avroField.getObjectProp(FIELD_ID_PROP); + if (existing instanceof Number) { Review Comment: π€ Could `assign(schema, startId)` ever produce duplicate ids when called with a startId that's not strictly greater than all pre-existing ids? E.g. fields `[A(no id), B(id=0), C(id=1)]` with `assign(schema, 0)`: A is stamped with 0 (nextId=1), then B's existingId=0 < 1 is skipped, leaving A and B both holding id 0. `assignFresh` avoids this by precomputing a safe startId, but since `assign` is public is it worth either making it `private` / package-private, or doing a discovery pass first to bump `nextId` past all existing ids before stamping any blanks? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala: ########## @@ -72,116 +68,120 @@ case class AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], cha // convert to delete first then add again val deleteChanges = changes.filter(p => p.isInstanceOf[DeleteColumn]).map(_.asInstanceOf[DeleteColumn]) val addChanges = changes.filter(p => p.isInstanceOf[AddColumn]).map(_.asInstanceOf[AddColumn]) - val (oldSchema, historySchema) = getInternalSchemaAndHistorySchemaStr(sparkSession) + val (oldSchema, historySchema) = getEvolutionSchemaAndHistorySchemaStr(sparkSession) val newSchema = applyAddAction2Schema(sparkSession, applyDeleteAction2Schema(sparkSession, oldSchema, deleteChanges), addChanges) - val verifiedHistorySchema = if (historySchema == null || historySchema.isEmpty) { - SerDeHelper.inheritSchemas(oldSchema, "") - } else { - historySchema - } + val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema) AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, table, sparkSession) logInfo("column replace finished") } - def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: InternalSchema, addChanges: Seq[AddColumn]): InternalSchema = { - val addChange = TableChanges.ColumnAddChange.get(oldSchema) + def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: HoodieSchema, addChanges: Seq[AddColumn]): HoodieSchema = { + var cur = oldSchema addChanges.foreach { addColumn => val names = addColumn.fieldNames() val parentName = AlterTableCommand.getParentName(names) - // add col change - val colType = SparkInternalSchemaConverter.buildTypeFromStructType(addColumn.dataType(), true, new AtomicInteger(0)) - addChange.addColumns(parentName, names.last, colType, addColumn.comment()) - // add position change - addColumn.position() match { + val fullName = names.mkString(".") + val colType = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(addColumn.dataType(), names.last) + val (positionType, positionRef) = addColumn.position() match { case after: TableChange.After => - addChange.addPositionChange(names.mkString("."), - if (parentName.isEmpty) after.column() else parentName + "." + after.column(), "after") + (ColumnPositionType.AFTER, if (parentName.isEmpty) after.column() else parentName + "." + after.column()) case _: TableChange.First => - addChange.addPositionChange(names.mkString("."), "", "first") + (ColumnPositionType.FIRST, "") case _ => + (ColumnPositionType.NO_OPERATION, "") } + cur = new HoodieSchemaChangeApplier(cur).applyAddChange(fullName, colType, addColumn.comment(), positionRef, positionType) } - SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange) + cur } - private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = { - val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema) - deleteChanges.foreach { c => - val originalColName = c.fieldNames().mkString(".") - checkSchemaChange(Seq(originalColName), table) - deleteChange.deleteColumn(originalColName) + private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: HoodieSchema, deleteChanges: Seq[DeleteColumn]): HoodieSchema = { + val colNames = deleteChanges.map { c => Review Comment: π€ This loop now applies each add to the cumulative `cur` schema, whereas the legacy code accumulated all add+position changes into a single `ColumnAddChange` against `oldSchema` and applied them once. For multi-column adds where one references another (e.g. `ADD COLUMNS (x AFTER z, z ...)`), the legacy batch handled it because both columns existed by the time positions were resolved; here the first iteration would fail to resolve `AFTER z`. Could you confirm this isn't reachable from Spark DDL, or apply the same batch fix you did in `applyUpdateAction` (accumulate changes, apply once)? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java: ########## @@ -166,44 +163,43 @@ private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTrans HoodieBaseFile baseFile, HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) { - Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(writeConfig.getInternalSchema()); + Option<HoodieSchema> querySchemaOpt = HoodieSchemaSerDe.fromJson(writeConfig.getInternalSchema()); // TODO support bootstrap if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { // check implicitly add columns, and position reorder(spark sql may change cols order) - InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(), + HoodieSchema querySchema = HoodieSchemaEvolutionUtils.reconcileSchema(writerSchema, querySchemaOpt.get(), writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)); long commitInstantTime = Long.parseLong(baseFile.getCommitTime()); - InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient); - if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) { + HoodieSchema fileSchema = HoodieSchemaHistoryCache.getSchemaByVersionId(commitInstantTime, metaClient); + if ((fileSchema == null || fileSchema.isEmptySchema()) && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); try { - fileSchema = InternalSchemaConverter.convert(tableSchemaResolver.getTableSchema(true)); + fileSchema = tableSchemaResolver.getTableSchema(true); } catch (Exception e) { throw new HoodieException(String.format("Failed to get InternalSchema for given versionId: %s", commitInstantTime), e); } } - final InternalSchema writeInternalSchema = fileSchema; + final HoodieSchema fileWriteSchema = fileSchema; List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName(); - List<String> colNamesFromWriteSchema = writeInternalSchema.getAllColsFullName(); + List<String> colNamesFromWriteSchema = fileWriteSchema.getAllColsFullName(); List<String> sameCols = colNamesFromWriteSchema.stream() .filter(f -> { - int writerSchemaFieldId = writeInternalSchema.findIdByName(f); + int writerSchemaFieldId = fileWriteSchema.findIdByName(f); int querySchemaFieldId = querySchema.findIdByName(f); return colNamesFromQuerySchema.contains(f) && writerSchemaFieldId == querySchemaFieldId && writerSchemaFieldId != -1 - && Objects.equals(writeInternalSchema.findType(writerSchemaFieldId), querySchema.findType(querySchemaFieldId)); + && Objects.equals(fileWriteSchema.findType(writerSchemaFieldId), querySchema.findType(querySchemaFieldId)); }) .collect(Collectors.toList()); - InternalSchema mergedSchema = new InternalSchemaMerger(writeInternalSchema, querySchema, + HoodieSchema newWriterSchema = new HoodieSchemaMerger(fileWriteSchema, querySchema, true, false, false).mergeSchema(); Review Comment: π€ Could you switch this to `mergeSchema(writerSchema.getFullName())`? Legacy `InternalSchemaConverter.convert(mergedSchema, writerSchema.getFullName())` aligned both `newWriterSchema` and `writeSchemaFromFile` to the writer's name; with the new no-arg `mergeSchema()`, `newWriterSchema` ends up named after `querySchema` (the persisted evolution schema) while `writeSchemaFromFile = fileWriteSchema` keeps the file's name β same drift you fixed for `AbstractHoodieLogRecordScanner` (line 685) and `HoodieAvroParquetReader` via the `mergeSchema(recordName)` overload. <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java: ########## @@ -210,29 +208,29 @@ protected Pair<ClosableIterator<T>, HoodieSchema> getRecordsIterator(HoodieDataB /** * Get final Read Schema for support evolution. * step1: find the fileSchema for current dataBlock. - * step2: determine whether fileSchema is compatible with the final read internalSchema. - * step3: merge fileSchema and read internalSchema to produce final read schema. + * step2: determine whether fileSchema is compatible with the read evolution schema. + * step3: merge fileSchema and read evolution schema to produce final read schema. * * @param dataBlock current processed block * @return final read schema. */ protected Option<Pair<Function<T, T>, HoodieSchema>> composeEvolvedSchemaTransformer( HoodieDataBlock dataBlock) { - if (internalSchema.isEmptySchema()) { + if (!evolutionSchemaOpt.isPresent()) { return Option.empty(); } long currentInstantTime = Long.parseLong(dataBlock.getLogBlockHeader().get(INSTANT_TIME)); - InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(currentInstantTime, hoodieTableMetaClient); - Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new InternalSchemaMerger(fileSchema, internalSchema, + HoodieSchema fileSchema = HoodieSchemaHistoryCache.searchSchemaAndCache(currentInstantTime, hoodieTableMetaClient); + Pair<HoodieSchema, Map<String, String>> mergedEvolutionSchema = new HoodieSchemaMerger(fileSchema, evolutionSchemaOpt.get(), true, false, false).mergeSchemaGetRenamed(); - HoodieSchema mergedAvroSchema = HoodieSchema.fromAvroSchema(InternalSchemaConverter.convert(mergedInternalSchema.getLeft(), readerSchema.getFullName()).getAvroSchema()); + HoodieSchema mergedAvroSchema = mergedEvolutionSchema.getLeft(); Review Comment: π€ nit: this variable is a `HoodieSchema` now, not an Avro schema β `mergedAvroSchema` carried over from the legacy code where it really was an Avro `Schema`. Could you rename to `mergedSchema` here (and at the matching spots in `AbstractHoodieLogRecordScanner.composeEvolvedSchemaTransformer` and `FileGroupReaderSchemaHandler.getRequiredSchemaForFileAndRenamedColumns`)? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala: ########## @@ -73,59 +70,48 @@ object HoodieSchemaUtils { } /** - * get latest internalSchema from table - * - * @param config instance of {@link HoodieConfig} - * @param tableMetaClient instance of HoodieTableMetaClient - * @return Option of InternalSchema. Will always be empty if schema on read is disabled + * Returns the schema-on-read evolution [[HoodieSchema]] for the table β carrying + * field ids and version metadata β or [[None]] when schema-on-read is disabled, + * no schema is in commit metadata, or any read error occurs (read failures are + * swallowed rather than propagated). */ - def getLatestTableInternalSchema(config: HoodieConfig, - tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { - getLatestTableInternalSchema(config.getProps, tableMetaClient) + def getLatestTableEvolutionSchema(config: HoodieConfig, + tableMetaClient: HoodieTableMetaClient): Option[HoodieSchema] = { + getLatestTableEvolutionSchema(config.getProps, tableMetaClient) } - /** - * get latest internalSchema from table - * - * @param props instance of {@link Properties} - * @param tableMetaClient instance of HoodieTableMetaClient - * @return Option of InternalSchema. Will always be empty if schema on read is disabled - */ - def getLatestTableInternalSchema(props: Properties, - tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { + def getLatestTableEvolutionSchema(props: Properties, + tableMetaClient: HoodieTableMetaClient): Option[HoodieSchema] = { if (!ConfigUtils.getBooleanWithAltKeys(props, DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { None } else { try { val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata - if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None + val schemaOpt = tableSchemaResolver.getTableEvolutionSchemaFromCommitMetadata + if (schemaOpt.isPresent) Some(schemaOpt.get()) else None } catch { case _: Exception => None } } } /** - * Deduces writer's schema based on - * <ul> - * <li>Source's schema</li> - * <li>Target table's schema (including Hudi's [[InternalSchema]] representation)</li> - * </ul> + * Deduces writer's schema from source schema, the table's latest Avro schema, and + * the table's schema-on-read evolution schema (if any). */ def deduceWriterSchema(sourceSchema: HoodieSchema, - latestTableSchemaOpt: Option[HoodieSchema], - internalSchemaOpt: Option[InternalSchema], - opts: Map[String, String]): HoodieSchema = { + latestTableSchemaOpt: Option[HoodieSchema], Review Comment: π€ nit: the continuation parameters are over-indented and no longer line up with `sourceSchema` on the previous line β looks like the rename pushed the column out. Could you re-align these (and the matching Java-friendly overload further down) to start under `sourceSchema`? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala: ########## @@ -244,13 +241,13 @@ object AlterTableCommand extends Logging { /** * Generate an commit with new schema to change the table's schema. * - * @param internalSchema new schema after change + * @param evolutionSchema new schema after change * @param historySchemaStr history schemas * @param table The hoodie table. * @param sparkSession The spark session. */ - def commitWithSchema(internalSchema: InternalSchema, historySchemaStr: String, table: CatalogTable, sparkSession: SparkSession): Unit = { - val schema = InternalSchemaConverter.convert(internalSchema, HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table)) + def commitWithSchema(evolutionSchema: HoodieSchema, historySchemaStr: String, table: CatalogTable, sparkSession: SparkSession): Unit = { + val schema = evolutionSchema Review Comment: π€ Legacy `commitWithSchema` forcibly renamed the schema to `HoodieSchemaUtils.getRecordQualifiedName(table.identifier.table)`. The new code uses `evolutionSchema` as-is, so its existing `getFullName` is what gets persisted in `LATEST_SCHEMA` and used by the write client. If `evolutionSchema.getFullName` ever diverges from the table-identifier-based name (e.g. on first ALTER after migrating a table whose original schema name was set differently), the persisted record name will differ from legacy. Was this intentional, or should we still apply the canonical record name here? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaSerDe.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema.evolution; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.utils.SerDeHelper; + +import java.util.ArrayList; +import java.util.List; +import java.util.TreeMap; + +/** + * HoodieSchema-shaped faΓ§ade for evolution-schema JSON serialization. + * + * <p>The on-disk format is fixed: the {@code latest_schema} blob in commit metadata + * and the {@code .hoodie/.schema/} history files use the same JSON layout that + * {@link SerDeHelper} has always produced ({@code schemas} array containing + * objects with {@code version_id}, {@code max_column_id}, {@code type}, {@code fields}, etc.). + * Old tables must remain readable, so this faΓ§ade delegates to {@link SerDeHelper} + * verbatim and converts at the HoodieSchema/InternalSchema boundary via + * {@link HoodieSchemaInternalSchemaBridge}, preserving field ids on the way out.</p> + * + * <p>Phase 5 of the InternalSchema removal will rewrite the JSON serializer in + * pure HoodieSchema terms behind this stable interface β but the byte-for-byte + * compatibility constraint stays in force.</p> + */ +public final class HoodieSchemaSerDe { + + /** + * Commit-metadata key under which the latest schema's JSON is stored. Carried + * over verbatim from {@link SerDeHelper#LATEST_SCHEMA} so callers reading old + * commit metadata pick up the same blob. + */ + public static final String LATEST_SCHEMA = SerDeHelper.LATEST_SCHEMA; + + /** + * JSON object key that wraps the array of historical schemas. Same as + * {@link SerDeHelper#SCHEMAS}. + */ + public static final String SCHEMAS = SerDeHelper.SCHEMAS; + + private HoodieSchemaSerDe() { + } + + /** + * Serializes a single schema to JSON. Output format matches the legacy + * {@link SerDeHelper#toJson(InternalSchema)} byte for byte. + */ + public static String toJson(HoodieSchema schema) { + return SerDeHelper.toJson(HoodieSchemaInternalSchemaBridge.toInternalSchema(schema)); + } + + /** + * Serializes a history of schemas to JSON. Output format matches the legacy + * {@link SerDeHelper#toJson(List)} byte for byte. + */ + public static String toJsonHistory(List<HoodieSchema> schemas) { + List<InternalSchema> converted = new ArrayList<>(schemas.size()); + for (HoodieSchema s : schemas) { + converted.add(HoodieSchemaInternalSchemaBridge.toInternalSchema(s)); + } + return SerDeHelper.toJson(converted); + } + + /** + * Parses a single-schema JSON blob (typically the {@code latest_schema} value + * from commit metadata). Returns empty if the input is null/empty so callers + * can pass through optional commit metadata fields. + */ + public static Option<HoodieSchema> fromJson(String json) { + Option<InternalSchema> internal = SerDeHelper.fromJson(json); + if (!internal.isPresent()) { + return Option.empty(); + } + return Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(), defaultRecordName(internal.get()))); + } + + /** + * Variant of {@link #fromJson(String)} that lets the caller fix the record name + * on the resulting HoodieSchema. Equivalent to the legacy + * {@code SerDeHelper.fromJson(...).map(is -> InternalSchemaConverter.convert(is, recordName))} + * pattern, collapsed into a single call. + */ + public static Option<HoodieSchema> fromJson(String json, String recordName) { + Option<InternalSchema> internal = SerDeHelper.fromJson(json); + if (!internal.isPresent()) { + return Option.empty(); + } + return Option.of(HoodieSchemaInternalSchemaBridge.toHoodieSchema(internal.get(), recordName)); + } + + /** + * Parses the history-schemas JSON layout (array of versioned schemas) and + * returns them keyed by {@code version_id}. Field ids are preserved on each + * returned HoodieSchema so the resulting map is directly usable by the + * read/merge path. + */ + public static TreeMap<Long, HoodieSchema> fromJsonHistory(String json) { + TreeMap<Long, InternalSchema> internals = SerDeHelper.parseSchemas(json); + TreeMap<Long, HoodieSchema> out = new TreeMap<>(); + for (Long versionId : internals.keySet()) { + InternalSchema is = internals.get(versionId); + HoodieSchema hs = HoodieSchemaInternalSchemaBridge.toHoodieSchema(is, defaultRecordName(is)); + out.put(versionId, hs); + } + return out; + } + + /** + * Appends a freshly-evolved schema to an existing serialized history blob and + * returns the new blob. Mirrors {@link SerDeHelper#inheritSchemas(InternalSchema, String)} + * β the {@code oldHistoryJson} is the prior {@code .hoodie/.schema/} contents + * (or empty for the first commit). + */ + public static String inheritHistory(HoodieSchema newSchema, String oldHistoryJson) { + return SerDeHelper.inheritSchemas( + HoodieSchemaInternalSchemaBridge.toInternalSchema(newSchema), oldHistoryJson); + } + + /** + * Resolves the schema-history entry that applies to a given version id β exact + * match if present, else the largest entry strictly less than {@code versionId}, + * else {@code null}. HoodieSchema-shaped replacement for + * {@link org.apache.hudi.internal.schema.utils.InternalSchemaUtils#searchSchema}. + * + * <p>Note: legacy returned {@code InternalSchema.getEmptyInternalSchema()} on + * miss; this returns {@code null} so callers can choose their own empty + * sentinel via {@link HoodieSchema#empty()}. Most callers null-check + fall + * back, so the change is benign.</p> + */ + public static HoodieSchema searchSchema(long versionId, java.util.TreeMap<Long, HoodieSchema> history) { + if (history.containsKey(versionId)) { + return history.get(versionId); + } + java.util.SortedMap<Long, HoodieSchema> headMap = history.headMap(versionId); + return headMap.isEmpty() ? null : headMap.get(headMap.lastKey()); Review Comment: π€ nit: returning bare `null` for the miss case (and noting it in the javadoc) breaks the codebase convention of using `Option<HoodieSchema>` for absent schemas β every caller has to remember to null-check. Could you return `Option<HoodieSchema>` here instead? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> ########## hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java: ########## @@ -99,72 +99,23 @@ public static HoodieSchema convert(InternalSchema internalSchema, String name) { } public static InternalSchema pruneHoodieSchemaToInternalSchema(HoodieSchema schema, InternalSchema originSchema) { Review Comment: π€ nit: `org.apache.hudi.common.schema.HoodieSchemaUtils.collectLeafNames(schema)` is fully qualified at both call sites β could you add an import (aliasing if there's a name clash) so the body reads `HoodieSchemaUtils.collectLeafNames(schema)`? <sub><i>- AI-generated; verify before applying. React π/π to flag quality.</i></sub> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
