nsivabalan commented on code in PR #9743: URL: https://github.com/apache/hudi/pull/9743#discussion_r1372446787
########## hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java: ########## @@ -116,9 +116,24 @@ public static String getAvroRecordQualifiedName(String tableName) { return "hoodie." + sanitizedTableName + "." + sanitizedTableName + "_record"; } + /** + * Validate whether the {@code targetSchema} is a valid evolution of {@code sourceSchema}. + * Basically {@link #isCompatibleProjectionOf(Schema, Schema)} but type promotion in the + * opposite direction + */ + public static boolean isValidEvolutionOf(Schema sourceSchema, Schema targetSchema) { + return (sourceSchema.getType() == Schema.Type.NULL) || isProjectionOfInternal(sourceSchema, targetSchema, + AvroSchemaUtils::isAtomicSchemasCompatibleEvolution); + } + + private static boolean isAtomicSchemasCompatibleEvolution(Schema oneAtomicType, Schema anotherAtomicType) { Review Comment: can we write extensive docs on these methods. in general we have not been very comfortable in touching these part of code. may be meng tao and few others are, but rest of the PMCs generally have been very cautious. Can you add more docs around these methods so its easier for maintenance going forward ########## hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java: ########## @@ -79,6 +81,14 @@ public class HoodieCommonConfig extends HoodieConfig { + " operation will fail schema compatibility check. Set this option to true will make the newly added " + " column nullable to successfully complete the write operation."); + public static final ConfigProperty<String> ADD_NULL_FOR_DELETED_COLUMNS = ConfigProperty + .key("hoodie.datasource.add.null.for.deleted.columns") Review Comment: "hoodie.datasource.set.null.for.missing.columns" ########## hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java: ########## @@ -173,7 +175,12 @@ private RecordIterator(Schema readerSchema, Schema writerSchema, byte[] content) this.totalRecords = this.dis.readInt(); } - this.reader = new GenericDatumReader<>(writerSchema, readerSchema); + if (recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, readerSchema)) { Review Comment: We should try and unify our convention across the code base. We use reader and writer schema here. we use table schema and source schema in outer layers. we use prevSchema in some cases. sourceSchema and targetSchema in few other places. We should try to align all these and use a standard terminology throughout. May be reader and writer in write handle classes. and source Schema and targetSchema else where. ########## hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java: ########## @@ -359,7 +360,8 @@ public Option<Pair<HoodieInstant, HoodieCommitMetadata>> getLastCommitMetadataWi return Option.fromJavaOptional( getCommitMetadataStream() .filter(instantCommitMetadataPair -> - !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY))) + !StringUtils.isNullOrEmpty(instantCommitMetadataPair.getValue().getMetadata(HoodieCommitMetadata.SCHEMA_KEY)) + && !WriteOperationType.schemaCantChange(instantCommitMetadataPair.getRight().getOperationType())) Review Comment: minor. can you switch the order of conditions. lets first check for operation type. and then check for SCHEMA_KEY in extra metadata ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.config.HoodieConfig +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.internal.schema.InternalSchema + +/** + * Util methods for Schema evolution in Hudi + */ +object HoodieSchemaUtils { + /** + * get latest internalSchema from table + * + * @param config instance of {@link HoodieConfig} + * @param tableMetaClient instance of HoodieTableMetaClient + * @return Option of InternalSchema. Will always be empty if schema on read is disabled + */ + def getLatestTableInternalSchema(config: HoodieConfig, + tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { + if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + None + } else { + try { + val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) Review Comment: should we fix this one also to not look into every commit metadata. (getTableInternalSchemaFromCommitMetadata). for eg, we should skip compaction, clustering commits etc. ########## hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java: ########## @@ -111,18 +117,22 @@ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSche return SchemaChangeUtils.applyTableChanges2Schema(internalSchemaAfterAddColumns, typeChange); } + public static Schema reconcileSchema(Schema incomingSchema, Schema oldTableSchema) { + return convert(reconcileSchema(incomingSchema, convert(oldTableSchema)), oldTableSchema.getFullName()); + } + /** - * Reconciles nullability requirements b/w {@code source} and {@code target} schemas, + * Reconciles nullability and datatype requirements b/w {@code source} and {@code target} schemas, * by adjusting these of the {@code source} schema to be in-line with the ones of the * {@code target} one * * @param sourceSchema source schema that needs reconciliation * @param targetSchema target schema that source schema will be reconciled against - * @param opts config options - * @return schema (based off {@code source} one) that has nullability constraints reconciled + * @param opts config options + * @return schema (based off {@code source} one) that has nullability constraints and datatypes reconciled */ - public static Schema reconcileNullability(Schema sourceSchema, Schema targetSchema, Map<String, String> opts) { - if (sourceSchema.getFields().isEmpty() || targetSchema.getFields().isEmpty()) { + public static Schema reconcileSchemaRequirements(Schema sourceSchema, Schema targetSchema, Map<String, String> opts) { Review Comment: reconcile is not very apparent to everyone. can you add more java docs as to what exactly we mean by reconcile here. do we add missing columns. or do we inject just defaults etc. ########## hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaUtils.java: ########## @@ -116,9 +116,24 @@ public static String getAvroRecordQualifiedName(String tableName) { return "hoodie." + sanitizedTableName + "." + sanitizedTableName + "_record"; } + /** + * Validate whether the {@code targetSchema} is a valid evolution of {@code sourceSchema}. + * Basically {@link #isCompatibleProjectionOf(Schema, Schema)} but type promotion in the + * opposite direction + */ + public static boolean isValidEvolutionOf(Schema sourceSchema, Schema targetSchema) { + return (sourceSchema.getType() == Schema.Type.NULL) || isProjectionOfInternal(sourceSchema, targetSchema, + AvroSchemaUtils::isAtomicSchemasCompatibleEvolution); + } + + private static boolean isAtomicSchemasCompatibleEvolution(Schema oneAtomicType, Schema anotherAtomicType) { Review Comment: also for method in L174. even you could write some example illustrations if need be. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala: ########## @@ -33,8 +33,9 @@ object HoodieParquetFileFormatHelper { val sparkRequestStructFields = requiredSchema.map(f => { val requiredType = f.dataType if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType, fileStructMap(f.name))) { - implicitTypeChangeInfo.put(new Integer(requiredSchema.fieldIndex(f.name)), org.apache.hudi.common.util.collection.Pair.of(requiredType, fileStructMap(f.name))) - StructField(f.name, fileStructMap(f.name), f.nullable) + val readerType = addMissingFields(requiredType, fileStructMap(f.name)) Review Comment: can you add java docs. if possible some example illustrations ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java: ########## @@ -523,9 +526,15 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc throw new UnsupportedOperationException("Spark record only support parquet log."); } + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf())) Review Comment: syncOnce() impl already has an instance of HoodieTableMetaClient just after refreshing the timeline. we should try to re-use that instead of creating new ones. ########## hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java: ########## @@ -41,18 +42,23 @@ public class AvroSchemaEvolutionUtils { * 1) incoming data has missing columns that were already defined in the table –> null values will be injected into missing columns * 2) incoming data contains new columns not defined yet in the table -> columns will be added to the table schema (incoming dataframe?) * 3) incoming data has missing columns that are already defined in the table and new columns not yet defined in the table -> - * new columns will be added to the table schema, missing columns will be injected with null values + * new columns will be added to the table schema, missing columns will be injected with null values * 4) support type change * 5) support nested schema change. * Notice: - * the incoming schema should not have delete/rename semantics. - * for example: incoming schema: int a, int b, int d; oldTableSchema int a, int b, int c, int d - * we must guarantee the column c is missing semantic, instead of delete semantic. + * the incoming schema should not have delete/rename semantics. + * for example: incoming schema: int a, int b, int d; oldTableSchema int a, int b, int c, int d + * we must guarantee the column c is missing semantic, instead of delete semantic. + * * @param incomingSchema implicitly evolution of avro when hoodie write operation * @param oldTableSchema old internalSchema * @return reconcile Schema */ public static InternalSchema reconcileSchema(Schema incomingSchema, InternalSchema oldTableSchema) { + /* If incoming schema is null, we fall back on table schema. */ Review Comment: am curious as to how new incoming could have NULL schema? if someone tries to write emptydataframe to hudi is it? why we are accounting for NULL schemas in all these places. ########## hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java: ########## @@ -131,20 +141,34 @@ public static Schema reconcileNullability(Schema sourceSchema, Schema targetSche List<String> colNamesSourceSchema = sourceInternalSchema.getAllColsFullName(); List<String> colNamesTargetSchema = targetInternalSchema.getAllColsFullName(); - List<String> candidateUpdateCols = colNamesSourceSchema.stream() + List<String> nullableUpdateCols = colNamesSourceSchema.stream() .filter(f -> (("true".equals(opts.get(MAKE_NEW_COLUMNS_NULLABLE.key())) && !colNamesTargetSchema.contains(f)) || colNamesTargetSchema.contains(f) && sourceInternalSchema.findField(f).isOptional() != targetInternalSchema.findField(f).isOptional() ) ).collect(Collectors.toList()); + List<String> typeUpdateCols = colNamesSourceSchema.stream() Review Comment: can we do L 144 to 150 using single for loop. ########## hudi-common/src/main/java/org/apache/hudi/internal/schema/utils/AvroSchemaEvolutionUtils.java: ########## @@ -131,20 +141,34 @@ public static Schema reconcileNullability(Schema sourceSchema, Schema targetSche List<String> colNamesSourceSchema = sourceInternalSchema.getAllColsFullName(); List<String> colNamesTargetSchema = targetInternalSchema.getAllColsFullName(); - List<String> candidateUpdateCols = colNamesSourceSchema.stream() + List<String> nullableUpdateCols = colNamesSourceSchema.stream() Review Comment: nullableUpdateColsInSource ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal { latestTableSchemaOpt: Option[Schema], internalSchemaOpt: Option[InternalSchema], opts: Map[String, String]): Schema = { + val addNullForDeletedColumns = opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(), Review Comment: if you happened to change the config key, lets change the respective var names as well ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal { latestTableSchemaOpt: Option[Schema], internalSchemaOpt: Option[InternalSchema], opts: Map[String, String]): Schema = { + val addNullForDeletedColumns = opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(), + DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.defaultValue).toBoolean val shouldReconcileSchema = opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean latestTableSchemaOpt match { // In case table schema is empty we're just going to use the source schema as a - // writer's schema. No additional handling is required - case None => sourceSchema + // writer's schema. + case None => AvroInternalSchemaConverter.fixNullOrdering(sourceSchema) Review Comment: wrt impl of AvroInternalSchemaConverter.fixNullOrdering, why can't we just reconstruct avro schema. why do we have to convert avro -> internal schema and then back to avro again. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -624,17 +635,25 @@ class HoodieSparkSqlWriterInternal { } else { if (!shouldValidateSchemasCompatibility) { // if no validation is enabled, check for col drop - // if col drop is allowed, go ahead. if not, check for projection, so that we do not allow dropping cols - if (allowAutoEvolutionColumnDrop || canProject(latestTableSchema, canonicalizedSourceSchema)) { + if (allowAutoEvolutionColumnDrop) { canonicalizedSourceSchema } else { - log.error( - s"""Incoming batch schema is not compatible with the table's one. - |Incoming schema ${sourceSchema.toString(true)} - |Incoming schema (canonicalized) ${canonicalizedSourceSchema.toString(true)} - |Table's schema ${latestTableSchema.toString(true)} - |""".stripMargin) - throw new SchemaCompatibilityException("Incoming batch schema is not compatible with the table's one") + val reconciledSchema = if (addNullForDeletedColumns) { Review Comment: I feel we should just simplify all these. its getting hard to maintain, may be in next release, we should try to do something as follows. (my below suggestion is for OOB schema evolution). Schema on read will take a diff route as usual. proposal: we will never allow auto dropping of cols w/ regular writes. If one has to delete/drop columns, they have to go to spark-sql and trigger alter column command. So, if we keep that out of the way, then our schema evolution for regular ingestion might be way simpler. for missing cols in incoming compared to table schema, inject nulls. promote data type if need be. add new cols from incoming which are missing in target. we can get rid of cananicalize, reconcile etc. there won't be multiple if else blocks and it will all be one logic for any writer. spark-sql alter table command can infer configs like inject nulls for missing etc and deduce final table schema accordingly. but for regular writers, we can avoid all these diff permutation/combinations. anyways, something to keep in mind for future. guess we may not do it in this patch. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -94,8 +94,8 @@ object HoodieSparkSqlWriter { * * NOTE: This is an internal config that is not exposed to the public */ - val CANONICALIZE_NULLABLE: ConfigProperty[Boolean] = - ConfigProperty.key("hoodie.internal.write.schema.canonicalize.nullable") + val CANONICALIZE_SCHEMA: ConfigProperty[Boolean] = Review Comment: this is internal config right ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal { latestTableSchemaOpt: Option[Schema], internalSchemaOpt: Option[InternalSchema], opts: Map[String, String]): Schema = { + val addNullForDeletedColumns = opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(), + DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.defaultValue).toBoolean val shouldReconcileSchema = opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean latestTableSchemaOpt match { // In case table schema is empty we're just going to use the source schema as a - // writer's schema. No additional handling is required - case None => sourceSchema + // writer's schema. Review Comment: can you throw some light on this. for spark-sql writer, guess deduceWriterSchema is a no-op right. all manipulation happens in the write handle. or is my understanding wrong. if my understanding is right, how does adding new columns using MIT works. can you help me understand ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java: ########## @@ -565,35 +573,24 @@ private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchFromSourc } schemaProvider = this.userProvidedSchemaProvider; } else { - Option<Schema> latestTableSchemaOpt = UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, cfg.targetBasePath); - // Deduce proper target (writer's) schema for the transformed dataset, reconciling its + // Deduce proper target (writer's) schema for the input dataset, reconciling its Review Comment: so block of lines 551 to 574 does not need to invoke getDeducedSchemaProvider() is it? why is it so. can you help me understand. ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/LazyCastingIterator.java: ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.schema; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.utils.LazyIterableIterator; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +import java.util.Iterator; + +public class LazyCastingIterator extends LazyIterableIterator<GenericRecord,GenericRecord> { Review Comment: do we have UTs for these. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala: ########## @@ -33,8 +33,9 @@ object HoodieParquetFileFormatHelper { val sparkRequestStructFields = requiredSchema.map(f => { val requiredType = f.dataType if (fileStructMap.contains(f.name) && !isDataTypeEqual(requiredType, fileStructMap(f.name))) { - implicitTypeChangeInfo.put(new Integer(requiredSchema.fieldIndex(f.name)), org.apache.hudi.common.util.collection.Pair.of(requiredType, fileStructMap(f.name))) - StructField(f.name, fileStructMap(f.name), f.nullable) + val readerType = addMissingFields(requiredType, fileStructMap(f.name)) Review Comment: we should try and write UTs for these instead of e2e. our run tests run time will keep spiking if we end up writing e2e for all combinations. ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ########## @@ -545,33 +552,37 @@ class HoodieSparkSqlWriterInternal { latestTableSchemaOpt: Option[Schema], internalSchemaOpt: Option[InternalSchema], opts: Map[String, String]): Schema = { + val addNullForDeletedColumns = opts.getOrDefault(DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.key(), + DataSourceWriteOptions.ADD_NULL_FOR_DELETED_COLUMNS.defaultValue).toBoolean val shouldReconcileSchema = opts(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean val shouldValidateSchemasCompatibility = opts.getOrDefault(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key, HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.defaultValue).toBoolean latestTableSchemaOpt match { // In case table schema is empty we're just going to use the source schema as a - // writer's schema. No additional handling is required - case None => sourceSchema + // writer's schema. Review Comment: may not be related to this patch. but just curious ########## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSchemaUtils.scala: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.hudi.common.config.HoodieConfig +import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} +import org.apache.hudi.internal.schema.InternalSchema + +/** + * Util methods for Schema evolution in Hudi + */ +object HoodieSchemaUtils { + /** + * get latest internalSchema from table + * + * @param config instance of {@link HoodieConfig} + * @param tableMetaClient instance of HoodieTableMetaClient + * @return Option of InternalSchema. Will always be empty if schema on read is disabled + */ + def getLatestTableInternalSchema(config: HoodieConfig, + tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { + if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + None + } else { + try { + val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) Review Comment: btw, I saw that you have introduce cantChangeSchema method. lets ensure that schema on read is not broken. esply, when someone triggers alter schema from spark-sql layer, guess the latest commit metadata is expected to contain the latest table schema. with refactoring in this patch, lets ensure we don't break anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org