voonhous commented on code in PR #18680: URL: https://github.com/apache/hudi/pull/18680#discussion_r3182191841
########## hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaInternalSchemaBridge.java: ########## @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema.evolution; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.common.schema.types.Type; +import org.apache.hudi.common.schema.types.Types; +import org.apache.hudi.internal.schema.convert.InternalSchemaConverter; + +import java.util.ArrayList; +import java.util.List; + +/** + * One-way bridge from {@link InternalSchema} to {@link HoodieSchema} that preserves + * column ids by stamping them as Avro custom properties on the HoodieSchema's + * underlying schema tree. + * + * <p>This exists during the InternalSchema → HoodieSchema migration. The existing + * {@link InternalSchemaConverter#convert(InternalSchema, String)} produces a + * structurally-correct HoodieSchema but discards field ids. Downstream code in + * the new evolution layer relies on {@code field-id} / {@code element-id} / + * {@code key-id} / {@code value-id} properties being present, so we walk the + * InternalSchema and stamped HoodieSchema in lock-step and copy ids over.</p> + * + * <p>The walk order matches {@code InternalSchemaConverter.visitInternalSchemaToBuildHoodieSchema} + * (record fields in declared order; array element after array; map key + value after map), + * so positional pairing is exact.</p> + * + * <p>Public for the migration period only — Phase 4 callsite migrations across + * different packages need access to the conversion. Once Phase 5 rewrites the + * action algebra on pure HoodieSchema, this bridge and its dependency on + * {@code InternalSchema} go away.</p> + */ +public final class HoodieSchemaInternalSchemaBridge { + + private HoodieSchemaInternalSchemaBridge() { + } + + /** + * Converts a {@link HoodieSchema} to an {@link InternalSchema}, preserving column + * ids carried as {@code field-id} / {@code element-id} / {@code key-id} / + * {@code value-id} Avro custom properties. This is the inverse of + * {@link #toHoodieSchema(InternalSchema, String)} and exists so the façade can + * round-trip a HoodieSchema through the legacy applier without renumbering ids on + * every call. + * + * <p>For HoodieSchemas that have not yet had ids assigned (e.g. freshly parsed + * input), this falls back to the existing + * {@link InternalSchemaConverter#convert(HoodieSchema)} which mints fresh ids.</p> + */ + public static InternalSchema toInternalSchema(HoodieSchema hoodieSchema) { + // Preserve the empty-schema marker end-to-end: callers (e.g. FileGroupReader's + // schema handler) short-circuit on isEmptySchema() so the round-trip must not + // resurrect an "empty" HoodieSchema as a non-empty InternalSchema with the + // default versionId of 0. + if (hoodieSchema == null || hoodieSchema.isEmptySchema()) { + return InternalSchema.getEmptyInternalSchema(); + } + // Take the structurally-correct InternalSchema produced by the existing converter, + // then walk both schemas in parallel and overwrite the InternalSchema's freshly-minted + // ids with the ids carried as Avro properties on the HoodieSchema (where present). + InternalSchema fresh = InternalSchemaConverter.convert(hoodieSchema, hoodieSchema.getNameToPosition()); + Types.RecordType originalRecord = fresh.getRecord(); + Types.RecordType reidentified = (Types.RecordType) reidentify(hoodieSchema, originalRecord); + InternalSchema result = (originalRecord == reidentified) + ? fresh + : new InternalSchema(reidentified); + long schemaId = hoodieSchema.schemaId(); + if (schemaId >= 0) { + result.setSchemaId(schemaId); + } + int maxColumnId = hoodieSchema.maxColumnId(); + if (maxColumnId >= 0) { + result.setMaxColumnId(maxColumnId); + } + return result; + } + + /** + * Walks a HoodieSchema and the corresponding InternalSchema {@link Type} in parallel + * and produces a {@link Type} where each addressable id matches the HoodieSchema's + * Avro custom property (when present). Returns the original {@code internalType} + * unchanged when no overrides apply, so callers can short-circuit. + */ + private static Type reidentify(HoodieSchema hoodieSchema, Type internalType) { + HoodieSchema effective = hoodieSchema.isNullable() ? hoodieSchema.getNonNullType() : hoodieSchema; + switch (internalType.typeId()) { + case RECORD: { + Types.RecordType record = (Types.RecordType) internalType; + if (effective.getType() != HoodieSchemaType.RECORD) { + return internalType; + } + List<Types.Field> originalFields = record.fields(); + List<Types.Field> rebuilt = new ArrayList<>(originalFields.size()); + boolean anyChange = false; + for (int i = 0; i < originalFields.size(); i++) { + Types.Field original = originalFields.get(i); + HoodieSchemaField hf = effective.getFields().get(i); + int overrideId = hf.fieldId(); + Type childType = reidentify(hf.schema(), original.type()); + int finalId = overrideId >= 0 ? overrideId : original.fieldId(); + if (finalId == original.fieldId() && childType == original.type()) { + rebuilt.add(original); + } else { + rebuilt.add(Types.Field.get(finalId, original.isOptional(), original.name(), childType, original.doc())); + anyChange = true; + } + } + return anyChange ? Types.RecordType.get(rebuilt, record.name()) : record; + } + case ARRAY: { + Types.ArrayType array = (Types.ArrayType) internalType; + if (effective.getType() != HoodieSchemaType.ARRAY) { + return internalType; + } + int overrideElementId = readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.ELEMENT_ID_PROP), -1); + Type newElement = reidentify(effective.getElementType(), array.elementType()); + int finalElementId = overrideElementId >= 0 ? overrideElementId : array.elementId(); + if (finalElementId == array.elementId() && newElement == array.elementType()) { + return array; + } + return Types.ArrayType.get(finalElementId, array.isElementOptional(), newElement); + } + case MAP: { + Types.MapType map = (Types.MapType) internalType; + if (effective.getType() != HoodieSchemaType.MAP) { + return internalType; + } + int overrideKeyId = readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.KEY_ID_PROP), -1); + int overrideValueId = readIntProp(effective.getAvroSchema().getObjectProp(HoodieSchema.VALUE_ID_PROP), -1); + Type newValue = reidentify(effective.getValueType(), map.valueType()); + int finalKeyId = overrideKeyId >= 0 ? overrideKeyId : map.keyId(); + int finalValueId = overrideValueId >= 0 ? overrideValueId : map.valueId(); + if (finalKeyId == map.keyId() && finalValueId == map.valueId() && newValue == map.valueType()) { + return map; + } + return Types.MapType.get(finalKeyId, finalValueId, map.keyType(), newValue, map.isValueOptional()); + } + default: + return internalType; + } + } + + private static int readIntProp(Object raw, int fallback) { + return raw instanceof Number ? ((Number) raw).intValue() : fallback; + } + + /** + * Converts an {@link InternalSchema} to a {@link HoodieSchema} and stamps every + * sub-schema with the corresponding field id from the source. The schema-level + * version id and max column id are also propagated. + */ + public static HoodieSchema toHoodieSchema(InternalSchema internalSchema, String recordName) { + HoodieSchema hoodieSchema = InternalSchemaConverter.convert(internalSchema, recordName); Review Comment: Added the early return: returns HoodieSchema.empty() when input is null or isEmptySchema(), matching the symmetry of the toInternalSchema side. ########## hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/avro/HoodieAvroParquetReader.java: ########## @@ -64,14 +62,9 @@ public HoodieAvroParquetReader(InputSplit inputSplit, Configuration conf, Option MessageType messageType = fileFooter.getFileMetaData().getSchema(); baseSchema = getAvroSchemaConverter(conf).convert(messageType); - if (internalSchemaOption.isPresent()) { + if (evolutionSchemaOption.isPresent()) { // do schema reconciliation in case there exists read column which is not in the file schema. - InternalSchema mergedInternalSchema = new InternalSchemaMerger( - InternalSchemaConverter.convert(baseSchema), - internalSchemaOption.get(), - true, - true).mergeSchema(); - baseSchema = InternalSchemaConverter.convert(mergedInternalSchema, baseSchema.getFullName()); + baseSchema = new HoodieSchemaMerger(baseSchema, evolutionSchemaOption.get(), true, true).mergeSchema(); Review Comment: Fixed via the new mergeSchema(recordName) overload on HoodieSchemaMerger, passing baseSchema.getFullName() so the file schema's record name carries through instead of the hardcoded "schema" from the InputFormat. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala: ########## @@ -72,113 +65,109 @@ case class AlterTableCommand(table: CatalogTable, changes: Seq[TableChange], cha // convert to delete first then add again val deleteChanges = changes.filter(p => p.isInstanceOf[DeleteColumn]).map(_.asInstanceOf[DeleteColumn]) val addChanges = changes.filter(p => p.isInstanceOf[AddColumn]).map(_.asInstanceOf[AddColumn]) - val (oldSchema, historySchema) = getInternalSchemaAndHistorySchemaStr(sparkSession) + val (oldSchema, historySchema) = getEvolutionSchemaAndHistorySchemaStr(sparkSession) val newSchema = applyAddAction2Schema(sparkSession, applyDeleteAction2Schema(sparkSession, oldSchema, deleteChanges), addChanges) - val verifiedHistorySchema = if (historySchema == null || historySchema.isEmpty) { - SerDeHelper.inheritSchemas(oldSchema, "") - } else { - historySchema - } + val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema) AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, table, sparkSession) logInfo("column replace finished") } - def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: InternalSchema, addChanges: Seq[AddColumn]): InternalSchema = { - val addChange = TableChanges.ColumnAddChange.get(oldSchema) + def applyAddAction2Schema(sparkSession: SparkSession, oldSchema: HoodieSchema, addChanges: Seq[AddColumn]): HoodieSchema = { + var cur = oldSchema addChanges.foreach { addColumn => val names = addColumn.fieldNames() val parentName = AlterTableCommand.getParentName(names) - // add col change - val colType = SparkInternalSchemaConverter.buildTypeFromStructType(addColumn.dataType(), true, new AtomicInteger(0)) - addChange.addColumns(parentName, names.last, colType, addColumn.comment()) - // add position change - addColumn.position() match { + val fullName = names.mkString(".") + val colType = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(addColumn.dataType(), names.last) + val (positionType, positionRef) = addColumn.position() match { case after: TableChange.After => - addChange.addPositionChange(names.mkString("."), - if (parentName.isEmpty) after.column() else parentName + "." + after.column(), "after") + (ColumnPositionType.AFTER, if (parentName.isEmpty) after.column() else parentName + "." + after.column()) case _: TableChange.First => - addChange.addPositionChange(names.mkString("."), "", "first") + (ColumnPositionType.FIRST, "") case _ => + (ColumnPositionType.NO_OPERATION, "") } + cur = new HoodieSchemaChangeApplier(cur).applyAddChange(fullName, colType, addColumn.comment(), positionRef, positionType) } - SchemaChangeUtils.applyTableChanges2Schema(oldSchema, addChange) + cur } - private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: InternalSchema, deleteChanges: Seq[DeleteColumn]): InternalSchema = { - val deleteChange = TableChanges.ColumnDeleteChange.get(oldSchema) - deleteChanges.foreach { c => - val originalColName = c.fieldNames().mkString(".") - checkSchemaChange(Seq(originalColName), table) - deleteChange.deleteColumn(originalColName) + private def applyDeleteAction2Schema(sparkSession: SparkSession, oldSchema: HoodieSchema, deleteChanges: Seq[DeleteColumn]): HoodieSchema = { + val colNames = deleteChanges.map { c => + val name = c.fieldNames().mkString(".") + checkSchemaChange(Seq(name), table) + name + }.toArray + if (colNames.isEmpty) { + oldSchema + } else { + val newSchema = new HoodieSchemaChangeApplier(oldSchema).applyDeleteChange(colNames: _*) + // delete action should not change the getMaxColumnId field + newSchema.setMaxColumnId(oldSchema.maxColumnId()) + newSchema } - val newSchema = SchemaChangeUtils.applyTableChanges2Schema(oldSchema, deleteChange) - // delete action should not change the getMaxColumnId field - newSchema.setMaxColumnId(oldSchema.getMaxColumnId) - newSchema } def applyAddAction(sparkSession: SparkSession): Unit = { - val (oldSchema, historySchema) = getInternalSchemaAndHistorySchemaStr(sparkSession) + val (oldSchema, historySchema) = getEvolutionSchemaAndHistorySchemaStr(sparkSession) val newSchema = applyAddAction2Schema(sparkSession, oldSchema, changes.map(_.asInstanceOf[AddColumn])) - val verifiedHistorySchema = if (historySchema == null || historySchema.isEmpty) { - SerDeHelper.inheritSchemas(oldSchema, "") - } else { - historySchema - } + val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema) AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, table, sparkSession) logInfo("column add finished") } def applyDeleteAction(sparkSession: SparkSession): Unit = { - val (oldSchema, historySchema) = getInternalSchemaAndHistorySchemaStr(sparkSession) + val (oldSchema, historySchema) = getEvolutionSchemaAndHistorySchemaStr(sparkSession) val newSchema = applyDeleteAction2Schema(sparkSession, oldSchema, changes.map(_.asInstanceOf[DeleteColumn])) - val verifiedHistorySchema = if (historySchema == null || historySchema.isEmpty) { - SerDeHelper.inheritSchemas(oldSchema, "") - } else { - historySchema - } + val verifiedHistorySchema = inheritedHistory(oldSchema, historySchema) AlterTableCommand.commitWithSchema(newSchema, verifiedHistorySchema, table, sparkSession) logInfo("column delete finished") } def applyUpdateAction(sparkSession: SparkSession): Unit = { - val (oldSchema, historySchema) = getInternalSchemaAndHistorySchemaStr(sparkSession) - val updateChange = TableChanges.ColumnUpdateChange.get(oldSchema) + val (oldSchema, historySchema) = getEvolutionSchemaAndHistorySchemaStr(sparkSession) + var cur = oldSchema changes.foreach { change => - change match { + val applier = new HoodieSchemaChangeApplier(cur) Review Comment: Fixed. applyUpdateAction now accumulates all changes into a single ColumnUpdateChange against the original schema and applies them in one shot via SchemaChangeUtils.applyTableChanges2Schema, restoring the legacy batch semantics where multi-change alters targeting the same field id resolve correctly. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala: ########## @@ -198,15 +184,14 @@ object HoodieSchemaUtils { private def deduceWriterSchemaWithReconcile(sourceSchema: HoodieSchema, canonicalizedSourceSchema: HoodieSchema, latestTableSchema: HoodieSchema, - internalSchemaOpt: Option[InternalSchema], + tableEvolutionSchemaOpt: Option[HoodieSchema], opts: Map[String, String]): HoodieSchema = { - internalSchemaOpt match { - case Some(internalSchema) => + tableEvolutionSchemaOpt match { + case Some(tableEvolutionSchema) => // Apply schema evolution, by auto-merging write schema and read schema val setNullForMissingColumns = opts.getOrElse(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key(), HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.defaultValue()).toBoolean - val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(canonicalizedSourceSchema.toAvroSchema(), internalSchema, setNullForMissingColumns) - val evolvedSchema = InternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getFullName) + val evolvedSchema = HoodieSchemaEvolutionUtils.reconcileSchemaStructural(canonicalizedSourceSchema, tableEvolutionSchema, setNullForMissingColumns) Review Comment: Fixed by renaming the reconciled schema to latestTableSchema.getFullName via HoodieSchemaInternalSchemaBridge.withRecordName when the names differ. The evolution schema's record name no longer leaks into the writer schema. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java: ########## @@ -801,16 +800,16 @@ Pair<InputBatch, Boolean> fetchNextBatchFromSource(Option<Checkpoint> resumeChec @VisibleForTesting SchemaProvider getDeducedSchemaProvider(HoodieSchema incomingSchema, SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) { Option<HoodieSchema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), storage, cfg.targetBasePath, metaClient); - Option<InternalSchema> internalSchemaOpt = HoodieConversionUtils.toJavaOption( - HoodieSchemaUtils.getLatestTableInternalSchema( + Option<HoodieSchema> evolutionSchemaOpt = HoodieConversionUtils.toJavaOption( + HoodieSchemaUtils.getLatestTableEvolutionSchema( HoodieStreamer.Config.getProps(conf, cfg), metaClient)); // Deduce proper target (writer's) schema for the input dataset, reconciling its // schema w/ the table's one - HoodieSchema targetSchema = HoodieSchemaUtils.deduceWriterSchema( + HoodieSchema targetSchema = HoodieSchemaUtils.deduceWriterSchemaWithEvolution( Review Comment: Addressed (renamed to deduceWriterSchema). The legacy Option[InternalSchema] overload was already gone, so no collision. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java: ########## @@ -139,7 +139,9 @@ public static HoodieFileGroupReader<RowData> createFileGroupReader( .withFileSlice(fileSlice) .withDataSchema(tableSchema) .withRequestedSchema(requiredSchema) - .withInternalSchema(Option.ofNullable(internalSchemaManager.getQuerySchema())) + .withEvolutionSchema(internalSchemaManager.getQuerySchema() == null Review Comment: Addressed (collapsed to Option.ofNullable(internalSchemaManager.getQuerySchema())). ########## hudi-common/src/main/java/org/apache/hudi/common/schema/evolution/HoodieSchemaEvolutionUtils.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema.evolution; + +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.types.Type; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.internal.schema.InternalSchema; +import org.apache.hudi.internal.schema.convert.InternalSchemaConverter; +import org.apache.hudi.internal.schema.utils.AvroSchemaEvolutionUtils; +import org.apache.hudi.internal.schema.utils.InternalSchemaUtils; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * HoodieSchema-shaped façade for write-path schema evolution: reconciling an + * incoming write schema against the table's current schema, including missing + * columns, added columns, type promotions, and nullability adjustments. + * + * <p>Mirrors the two entry points of {@link AvroSchemaEvolutionUtils} but consumes + * and produces {@link HoodieSchema} so callers can stay off direct + * {@code InternalSchema} usage. During the InternalSchema → HoodieSchema migration + * this delegates to the legacy implementation via + * {@link HoodieSchemaInternalSchemaBridge}, which preserves field ids end-to-end. + * Phase 5 rewrites the implementation in pure HoodieSchema terms behind this stable + * interface.</p> + */ +public final class HoodieSchemaEvolutionUtils { + + private HoodieSchemaEvolutionUtils() { + } + + /** + * Reconciles an incoming write schema against the existing table schema, adding + * any new columns, promoting types where allowed, and (optionally) marking + * missing columns as nullable. + * + * <p>Semantics match {@link AvroSchemaEvolutionUtils#reconcileSchema(org.apache.avro.Schema, InternalSchema, boolean)}: + * the incoming schema is assumed to have <i>missing</i> columns rather than + * <i>deleted</i> columns. Renames and explicit deletes are not inferred here — + * those are handled by the explicit DDL path through + * {@link HoodieSchemaChangeApplier}.</p> + * + * @param incomingSchema incoming write schema + * @param oldTableSchema current table schema (with field ids) + * @param makeMissingFieldsNullable when true, table fields absent from the + * incoming schema are marked nullable in the + * reconciled result + * @return reconciled HoodieSchema with field ids preserved on unchanged columns + */ + public static HoodieSchema reconcileSchema(HoodieSchema incomingSchema, + HoodieSchema oldTableSchema, + boolean makeMissingFieldsNullable) { + InternalSchema oldInternal = HoodieSchemaInternalSchemaBridge.toInternalSchema(oldTableSchema); + InternalSchema reconciled = AvroSchemaEvolutionUtils.reconcileSchema( + incomingSchema.getAvroSchema(), oldInternal, makeMissingFieldsNullable); + return HoodieSchemaInternalSchemaBridge.toHoodieSchema(reconciled, oldTableSchema.getFullName()); + } + + /** + * Avro-only sibling of {@link #reconcileSchema(HoodieSchema, HoodieSchema, boolean)} + * that does <em>not</em> route through the InternalSchema bridge — field ids are + * neither read from the inputs nor stamped on the output. Use this from the + * write-path's structural reconciliation (e.g. {@code deduceWriterSchema}) where + * carrying ids over from the table's evolution-schema would leak them into + * commit metadata and Parquet writes that historically didn't include them. + */ + public static HoodieSchema reconcileSchemaStructural(HoodieSchema incomingSchema, + HoodieSchema oldTableSchema, + boolean makeMissingFieldsNullable) { + org.apache.avro.Schema reconciled = AvroSchemaEvolutionUtils.reconcileSchema( + incomingSchema.getAvroSchema(), oldTableSchema.getAvroSchema(), makeMissingFieldsNullable); + return HoodieSchema.fromAvroSchema(reconciled); + } + + /** + * Reconciles nullability and type-promotion requirements between a source + * (incoming) schema and a target (existing) schema, adjusting the source to be + * in line with the target's nullability and promotable types. + * + * <p>Semantics match + * {@link AvroSchemaEvolutionUtils#reconcileSchemaRequirements(org.apache.avro.Schema, org.apache.avro.Schema, boolean)}. + * If {@code shouldReorderColumns} is true, the source's fields are ordered to match + * the target's positional layout, preserving inter-commit field ordering.</p> + * + * @param sourceSchema incoming source schema to be reconciled + * @param targetSchema target schema to reconcile against + * @param shouldReorderColumns if true, fields in the result follow the target's order + * @return source-shaped HoodieSchema with nullability and types reconciled + */ + public static HoodieSchema reconcileSchemaRequirements(HoodieSchema sourceSchema, + HoodieSchema targetSchema, + boolean shouldReorderColumns) { + org.apache.avro.Schema reconciled = AvroSchemaEvolutionUtils.reconcileSchemaRequirements( + sourceSchema == null ? null : sourceSchema.getAvroSchema(), + targetSchema == null ? null : targetSchema.getAvroSchema(), + shouldReorderColumns); + return HoodieSchema.fromAvroSchema(reconciled); + } + + /** Review Comment: Addressed. ########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java: ########## @@ -166,23 +163,23 @@ private Option<Function<HoodieRecord, HoodieRecord>> composeSchemaEvolutionTrans HoodieBaseFile baseFile, HoodieWriteConfig writeConfig, HoodieTableMetaClient metaClient) { - Option<InternalSchema> querySchemaOpt = SerDeHelper.fromJson(writeConfig.getInternalSchema()); + Option<HoodieSchema> querySchemaOpt = HoodieSchemaSerDe.fromJson(writeConfig.getInternalSchema()); // TODO support bootstrap if (querySchemaOpt.isPresent() && !baseFile.getBootstrapBaseFile().isPresent()) { // check implicitly add columns, and position reorder(spark sql may change cols order) - InternalSchema querySchema = AvroSchemaEvolutionUtils.reconcileSchema(writerSchema.toAvroSchema(), + HoodieSchema querySchema = HoodieSchemaEvolutionUtils.reconcileSchema(writerSchema, querySchemaOpt.get(), writeConfig.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)); long commitInstantTime = Long.parseLong(baseFile.getCommitTime()); - InternalSchema fileSchema = InternalSchemaCache.getInternalSchemaByVersionId(commitInstantTime, metaClient); - if (fileSchema.isEmptySchema() && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) { + HoodieSchema fileSchema = HoodieSchemaHistoryCache.getSchemaByVersionId(commitInstantTime, metaClient); + if ((fileSchema == null || fileSchema.isEmptySchema()) && writeConfig.getBoolean(HoodieCommonConfig.RECONCILE_SCHEMA)) { TableSchemaResolver tableSchemaResolver = new TableSchemaResolver(metaClient); try { - fileSchema = InternalSchemaConverter.convert(tableSchemaResolver.getTableSchema(true)); + fileSchema = tableSchemaResolver.getTableSchema(true); } catch (Exception e) { throw new HoodieException(String.format("Failed to get InternalSchema for given versionId: %s", commitInstantTime), e); Review Comment: Addressed (renamed to fileWriteSchema). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
