[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file
alexeykudinkin commented on code in PR #6358: URL: https://github.com/apache/hudi/pull/6358#discussion_r990595189 ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java: ## @@ -185,7 +185,7 @@ public class HoodieWriteConfig extends HoodieConfig { public static final ConfigProperty AVRO_SCHEMA_VALIDATE_ENABLE = ConfigProperty .key("hoodie.avro.schema.validate") - .defaultValue("false") + .defaultValue("true") Review Comment: This is flipped to default to make sure proper schema validation are run for every operation on the table ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java: ## @@ -81,20 +79,7 @@ public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord(); /** - * The specified schema of the table. ("specified" denotes that this is configured by the client, - * as opposed to being implicitly fetched out of the commit metadata) - */ - protected final Schema tableSchema; - protected final Schema tableSchemaWithMetaFields; Review Comment: These fields were misused and are redundant, hence deleted ## hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java: ## @@ -0,0 +1,941 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.avro; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.hudi.common.util.Either; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +/** + * Evaluate the compatibility between a reader schema and a writer schema. A + * reader and a writer schema are declared compatible if all datum instances of + * the writer schema can be successfully decoded using the specified reader + * schema. + * + * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING + * + * This code is borrowed from Avro 1.10, with the following modifications: + * + * Compatibility checks ignore schema name, unless schema is held inside + * a union + * + * + */ +public class AvroSchemaCompatibility { Review Comment: Context: Avro requires at all times that schema's names have to match in order for them to be counted as compatible. Provided that only Avro bears the names on the schemas themselves (Spark does not, for ex) this makes for ex, some schemas converted from Spark's [[StructType]] incompatible w/ Avro This has code is mostly borrowed as is from Avro 1.10 w/ the following critical adjustments: Schema names now are only checked in following 2 cases: - In case it's a top-level schema - In case schema is enclosed into a union (in which case its name might be used for reverse-lookup) ## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java: ## @@ -18,91 +18,47 @@ package org.apache.hudi.table.action.commit; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.utils.MergingIterator; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer; -import org.apache.hudi.exception.HoodieException; import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.io.storage.HoodieFileReader; import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.ge
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file
alexeykudinkin commented on code in PR #6358: URL: https://github.com/apache/hudi/pull/6358#discussion_r981626526 ## hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala: ## @@ -321,8 +322,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie serializedInsertConditionAndExpressions(insertActions)) // Remove the meta fields from the sourceDF as we do not need these when writing. -val sourceDFWithoutMetaFields = removeMetaFields(sourceDF) -HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, writeParams, sourceDFWithoutMetaFields) +val trimmedSourceDF = removeMetaFields(sourceDF) + +// Supply original record's Avro schema to provided to [[ExpressionPayload]] +writeParams += (PAYLOAD_RECORD_AVRO_SCHEMA -> Review Comment: @wzx140 this is what i'm referring to -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file
alexeykudinkin commented on code in PR #6358: URL: https://github.com/apache/hudi/pull/6358#discussion_r971445275 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ## @@ -169,23 +179,107 @@ object HoodieSparkSqlWriter { } val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) - val dropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS) + + // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo + sparkContext.getConf.registerKryoClasses( +Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) + + val (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName) + val reconcileSchema = parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean + + val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean + var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) + + val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, tableIdentifier, sparkContext.hadoopConfiguration) + + val writerSchema: Schema = latestTableSchemaOpt match { +// In case table schema is empty we're just going to use the source schema as a +// writer's schema. No additional handling is required +case None => sourceSchema +// Otherwise, we need to make sure we reconcile incoming and latest table schemas +case Some(latestTableSchema) => + if (reconcileSchema) { +// In case we need to reconcile the schema and schema evolution is enabled, +// we will force-apply schema evolution to the writer's schema +if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) { + internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema)) +} + +if (internalSchemaOpt.isDefined) { + // Apply schema evolution, by auto-merging write schema and read schema + val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get) + AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getName) +} else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) { + // In case schema reconciliation is enabled and source and latest table schemas + // are compatible (as defined by [[TableSchemaResolver#isSchemaCompatible]]), then we + // will rebase incoming batch onto the table's latest schema (ie, reconcile them) + // + // NOTE: Since we'll be converting incoming batch from [[sourceSchema]] into [[latestTableSchema]] + // we're validating in that order (where [[sourceSchema]] is treated as a reader's schema, + // and [[latestTableSchema]] is treated as a writer's schema) + latestTableSchema +} else { + log.error( +s""" + |Failed to reconcile incoming batch schema with the table's one. + |Incoming schema ${sourceSchema.toString(true)} + + |Table's schema ${latestTableSchema.toString(true)} + + |""".stripMargin) + throw new SchemaCompatibilityException("Failed to reconcile incoming schema with the table's one") +} + } else { +// Before validating whether schemas are compatible, we need to "canonicalize" source's schema +// relative to the table's one, by doing a (minor) reconciliation of the nullability constraints: +// for ex, if in incoming schema column A is designated as non-null, but it's designated as nullable +// in the table's one we want to proceed w/ such operation, simply relaxing such constraint in the +// source schema. +val canonicalizedSourceSchema = AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema) +// In case reconciliation is disabled, we have to validate that the source's schema +// is compatible w/ the table's latest schema, such that we're able to read existing table's +// records using [[sourceSchema]]. +if (TableSchemaResolver.isSchemaCompatible(latestTableSchema, canonicalizedSourceSchema)) { + canonicalizedSourceSchema +} else { + log.error( +s""" + |Incoming batch schema is not compatible with the table's one. + |Incoming schema ${canonicalizedSourceSc
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file
alexeykudinkin commented on code in PR #6358: URL: https://github.com/apache/hudi/pull/6358#discussion_r971444542 ## hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala: ## @@ -169,23 +179,107 @@ object HoodieSparkSqlWriter { } val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) - val dropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS) + + // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo + sparkContext.getConf.registerKryoClasses( +Array(classOf[org.apache.avro.generic.GenericData], + classOf[org.apache.avro.Schema])) Review Comment: We always had that, this code just been moved from below to make sure we handle the schema in the same way for bulk-insert (w/ row-writing) as we do for any other operation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file
alexeykudinkin commented on code in PR #6358: URL: https://github.com/apache/hudi/pull/6358#discussion_r971444217 ## hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java: ## @@ -295,91 +295,19 @@ private MessageType convertAvroSchemaToParquet(Schema schema) { } /** - * HUDI specific validation of schema evolution. Ensures that a newer schema can be used for the dataset by - * checking if the data written using the old schema can be read using the new schema. + * Establishes whether {@code prevSchema} is compatible w/ {@code newSchema}, as + * defined by Avro's {@link SchemaCompatibility} * - * HUDI requires a Schema to be specified in HoodieWriteConfig and is used by the HoodieWriteClient to - * create the records. The schema is also saved in the data files (parquet format) and log files (avro format). - * Since a schema is required each time new data is ingested into a HUDI dataset, schema can be evolved over time. - * - * New Schema is compatible only if: - * A1. There is no change in schema - * A2. A field has been added and it has a default value specified - * - * New Schema is incompatible if: - * B1. A field has been deleted - * B2. A field has been renamed (treated as delete + add) - * B3. A field's type has changed to be incompatible with the older type - * - * Issue with org.apache.avro.SchemaCompatibility: - * org.apache.avro.SchemaCompatibility checks schema compatibility between a writer schema (which originally wrote - * the AVRO record) and a readerSchema (with which we are reading the record). It ONLY guarantees that that each - * field in the reader record can be populated from the writer record. Hence, if the reader schema is missing a - * field, it is still compatible with the writer schema. - * - * In other words, org.apache.avro.SchemaCompatibility was written to guarantee that we can read the data written - * earlier. It does not guarantee schema evolution for HUDI (B1 above). - * - * Implementation: This function implements specific HUDI specific checks (listed below) and defers the remaining - * checks to the org.apache.avro.SchemaCompatibility code. - * - * Checks: - * C1. If there is no change in schema: success - * C2. If a field has been deleted in new schema: failure - * C3. If a field has been added in new schema: it should have default value specified - * C4. If a field has been renamed(treated as delete + add): failure - * C5. If a field type has changed: failure - * - * @param oldSchema Older schema to check. - * @param newSchema Newer schema to check. - * @return True if the schema validation is successful - * - * TODO revisit this method: it's implemented incorrectly as it might be applying different criteria - * to top-level record and nested record (for ex, if that nested record is contained w/in an array) + * @param prevSchema previous instance of the schema + * @param newSchema new instance of the schema */ - public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) { -if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) { - // record names must match: - if (!SchemaCompatibility.schemaNameEquals(newSchema, oldSchema)) { -return false; - } - - // Check that each field in the oldSchema can populated the newSchema - for (final Field oldSchemaField : oldSchema.getFields()) { -final Field newSchemaField = SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField); -if (newSchemaField == null) { - // C4 or C2: newSchema does not correspond to any field in the oldSchema - return false; -} else { - if (!isSchemaCompatible(oldSchemaField.schema(), newSchemaField.schema())) { -// C5: The fields do not have a compatible type -return false; - } -} - } - - // Check that new fields added in newSchema have default values as they will not be - // present in oldSchema and hence cannot be populated on reading records from existing data. - for (final Field newSchemaField : newSchema.getFields()) { -final Field oldSchemaField = SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField); -if (oldSchemaField == null) { - if (newSchemaField.defaultVal() == null) { -// C3: newly added field in newSchema does not have a default value -return false; - } -} - } - - // All fields in the newSchema record can be populated from the oldSchema record - return true; -} else { - // Use the checks implemented by Avro - // newSchema is the schema which will be used to read the records written earlier using oldSchema. Hence, in the - // check below, use newSchema as the reader schema and oldSchema as the wr