[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file

2022-10-07 Thread GitBox


alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r990595189


##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##
@@ -185,7 +185,7 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty AVRO_SCHEMA_VALIDATE_ENABLE = 
ConfigProperty
   .key("hoodie.avro.schema.validate")
-  .defaultValue("false")
+  .defaultValue("true")

Review Comment:
   This is flipped to default to make sure proper schema validation are run for 
every operation on the table



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##
@@ -81,20 +79,7 @@
   public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
 
   /**
-   * The specified schema of the table. ("specified" denotes that this is 
configured by the client,
-   * as opposed to being implicitly fetched out of the commit metadata)
-   */
-  protected final Schema tableSchema;
-  protected final Schema tableSchemaWithMetaFields;

Review Comment:
   These fields were misused and are redundant, hence deleted



##
hudi-common/src/main/java/org/apache/hudi/avro/AvroSchemaCompatibility.java:
##
@@ -0,0 +1,941 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.avro;
+
+import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.hudi.common.util.Either;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * Evaluate the compatibility between a reader schema and a writer schema. A
+ * reader and a writer schema are declared compatible if all datum instances of
+ * the writer schema can be successfully decoded using the specified reader
+ * schema.
+ *
+ * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ *
+ *   This code is borrowed from Avro 1.10, with the following 
modifications:
+ *   
+ * Compatibility checks ignore schema name, unless schema is held 
inside
+ * a union
+ *   
+ *
+ */
+public class AvroSchemaCompatibility {

Review Comment:
   Context: Avro requires at all times that schema's names have to match in 
order for them to be counted as compatible. Provided that only Avro bears the 
names on the schemas themselves (Spark does not, for ex) this makes for ex, 
some schemas converted from Spark's [[StructType]] incompatible w/ Avro
   
   
   This has code is mostly borrowed as is from Avro 1.10 w/ the following 
critical adjustments: Schema names now are only checked in following 2 cases:
   
- In case it's a top-level schema
- In case schema is enclosed into a union (in which case its name might be 
used for reverse-lookup)
   



##
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java:
##
@@ -18,91 +18,47 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.utils.MergingIterator;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.ge

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file

2022-09-27 Thread GitBox


alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r981626526


##
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/MergeIntoHoodieTableCommand.scala:
##
@@ -321,8 +322,13 @@ case class MergeIntoHoodieTableCommand(mergeInto: 
MergeIntoTable) extends Hoodie
   serializedInsertConditionAndExpressions(insertActions))
 
 // Remove the meta fields from the sourceDF as we do not need these when 
writing.
-val sourceDFWithoutMetaFields = removeMetaFields(sourceDF)
-HoodieSparkSqlWriter.write(sparkSession.sqlContext, SaveMode.Append, 
writeParams, sourceDFWithoutMetaFields)
+val trimmedSourceDF = removeMetaFields(sourceDF)
+
+// Supply original record's Avro schema to provided to 
[[ExpressionPayload]]
+writeParams += (PAYLOAD_RECORD_AVRO_SCHEMA ->

Review Comment:
   @wzx140 this is what i'm referring to



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file

2022-09-14 Thread GitBox


alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r971445275


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##
@@ -169,23 +179,107 @@ object HoodieSparkSqlWriter {
   }
 
   val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
-  val dropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+
+  // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+  sparkContext.getConf.registerKryoClasses(
+Array(classOf[org.apache.avro.generic.GenericData],
+  classOf[org.apache.avro.Schema]))
+
+  val (structName, nameSpace) = 
AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)
+  val reconcileSchema = 
parameters(DataSourceWriteOptions.RECONCILE_SCHEMA.key()).toBoolean
+
+  val schemaEvolutionEnabled = 
parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), 
"false").toBoolean
+  var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, 
sparkContext)
+
+  val sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, 
nameSpace)
+  val latestTableSchemaOpt = getLatestTableSchema(spark, basePath, 
tableIdentifier, sparkContext.hadoopConfiguration)
+
+  val writerSchema: Schema = latestTableSchemaOpt match {
+// In case table schema is empty we're just going to use the source 
schema as a
+// writer's schema. No additional handling is required
+case None => sourceSchema
+// Otherwise, we need to make sure we reconcile incoming and latest 
table schemas
+case Some(latestTableSchema) =>
+  if (reconcileSchema) {
+// In case we need to reconcile the schema and schema evolution is 
enabled,
+// we will force-apply schema evolution to the writer's schema
+if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) {
+  internalSchemaOpt = 
Some(AvroInternalSchemaConverter.convert(sourceSchema))
+}
+
+if (internalSchemaOpt.isDefined) {
+  // Apply schema evolution, by auto-merging write schema and read 
schema
+  val mergedInternalSchema = 
AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get)
+  AvroInternalSchemaConverter.convert(mergedInternalSchema, 
latestTableSchema.getName)
+} else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, 
latestTableSchema)) {
+  // In case schema reconciliation is enabled and source and 
latest table schemas
+  // are compatible (as defined by 
[[TableSchemaResolver#isSchemaCompatible]]), then we
+  // will rebase incoming batch onto the table's latest schema 
(ie, reconcile them)
+  //
+  // NOTE: Since we'll be converting incoming batch from 
[[sourceSchema]] into [[latestTableSchema]]
+  //   we're validating in that order (where [[sourceSchema]] 
is treated as a reader's schema,
+  //   and [[latestTableSchema]] is treated as a writer's 
schema)
+  latestTableSchema
+} else {
+  log.error(
+s"""
+   |Failed to reconcile incoming batch schema with the table's 
one.
+   |Incoming schema ${sourceSchema.toString(true)}
+
+   |Table's schema ${latestTableSchema.toString(true)}
+
+   |""".stripMargin)
+  throw new SchemaCompatibilityException("Failed to reconcile 
incoming schema with the table's one")
+}
+  } else {
+// Before validating whether schemas are compatible, we need to 
"canonicalize" source's schema
+// relative to the table's one, by doing a (minor) reconciliation 
of the nullability constraints:
+// for ex, if in incoming schema column A is designated as 
non-null, but it's designated as nullable
+// in the table's one we want to proceed w/ such operation, simply 
relaxing such constraint in the
+// source schema.
+val canonicalizedSourceSchema = 
AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, 
latestTableSchema)
+// In case reconciliation is disabled, we have to validate that 
the source's schema
+// is compatible w/ the table's latest schema, such that we're 
able to read existing table's
+// records using [[sourceSchema]].
+if (TableSchemaResolver.isSchemaCompatible(latestTableSchema, 
canonicalizedSourceSchema)) {
+  canonicalizedSourceSchema
+} else {
+  log.error(
+s"""
+   |Incoming batch schema is not compatible with the table's 
one.
+   |Incoming schema ${canonicalizedSourceSc

[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file

2022-09-14 Thread GitBox


alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r971444542


##
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##
@@ -169,23 +179,107 @@ object HoodieSparkSqlWriter {
   }
 
   val commitActionType = CommitUtils.getCommitActionType(operation, 
tableConfig.getTableType)
-  val dropPartitionColumns = 
hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS)
+
+  // Register Avro classes ([[Schema]], [[GenericData]]) w/ Kryo
+  sparkContext.getConf.registerKryoClasses(
+Array(classOf[org.apache.avro.generic.GenericData],
+  classOf[org.apache.avro.Schema]))

Review Comment:
   We always had that, this code just been moved from below to make sure we 
handle the schema in the same way for bulk-insert (w/ row-writing) as we do for 
any other operation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [hudi] alexeykudinkin commented on a diff in pull request #6358: [HUDI-4588][HUDI-4472] Fixing `HoodieParquetReader` to properly specify projected schema when reading Parquet file

2022-09-14 Thread GitBox


alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r971444217


##
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##
@@ -295,91 +295,19 @@ private MessageType convertAvroSchemaToParquet(Schema 
schema) {
   }
 
   /**
-   * HUDI specific validation of schema evolution. Ensures that a newer schema 
can be used for the dataset by
-   * checking if the data written using the old schema can be read using the 
new schema.
+   * Establishes whether {@code prevSchema} is compatible w/ {@code 
newSchema}, as
+   * defined by Avro's {@link SchemaCompatibility}
*
-   * HUDI requires a Schema to be specified in HoodieWriteConfig and is used 
by the HoodieWriteClient to
-   * create the records. The schema is also saved in the data files (parquet 
format) and log files (avro format).
-   * Since a schema is required each time new data is ingested into a HUDI 
dataset, schema can be evolved over time.
-   *
-   * New Schema is compatible only if:
-   * A1. There is no change in schema
-   * A2. A field has been added and it has a default value specified
-   *
-   * New Schema is incompatible if:
-   * B1. A field has been deleted
-   * B2. A field has been renamed (treated as delete + add)
-   * B3. A field's type has changed to be incompatible with the older type
-   *
-   * Issue with org.apache.avro.SchemaCompatibility:
-   *  org.apache.avro.SchemaCompatibility checks schema compatibility between 
a writer schema (which originally wrote
-   *  the AVRO record) and a readerSchema (with which we are reading the 
record). It ONLY guarantees that that each
-   *  field in the reader record can be populated from the writer record. 
Hence, if the reader schema is missing a
-   *  field, it is still compatible with the writer schema.
-   *
-   *  In other words, org.apache.avro.SchemaCompatibility was written to 
guarantee that we can read the data written
-   *  earlier. It does not guarantee schema evolution for HUDI (B1 above).
-   *
-   * Implementation: This function implements specific HUDI specific checks 
(listed below) and defers the remaining
-   * checks to the org.apache.avro.SchemaCompatibility code.
-   *
-   * Checks:
-   * C1. If there is no change in schema: success
-   * C2. If a field has been deleted in new schema: failure
-   * C3. If a field has been added in new schema: it should have default value 
specified
-   * C4. If a field has been renamed(treated as delete + add): failure
-   * C5. If a field type has changed: failure
-   *
-   * @param oldSchema Older schema to check.
-   * @param newSchema Newer schema to check.
-   * @return True if the schema validation is successful
-   *
-   * TODO revisit this method: it's implemented incorrectly as it might be 
applying different criteria
-   *  to top-level record and nested record (for ex, if that nested record 
is contained w/in an array)
+   * @param prevSchema previous instance of the schema
+   * @param newSchema new instance of the schema
*/
-  public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) 
{
-if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == 
Schema.Type.RECORD) {
-  // record names must match:
-  if (!SchemaCompatibility.schemaNameEquals(newSchema, oldSchema)) {
-return false;
-  }
-
-  // Check that each field in the oldSchema can populated the newSchema
-  for (final Field oldSchemaField : oldSchema.getFields()) {
-final Field newSchemaField = 
SchemaCompatibility.lookupWriterField(newSchema, oldSchemaField);
-if (newSchemaField == null) {
-  // C4 or C2: newSchema does not correspond to any field in the 
oldSchema
-  return false;
-} else {
-  if (!isSchemaCompatible(oldSchemaField.schema(), 
newSchemaField.schema())) {
-// C5: The fields do not have a compatible type
-return false;
-  }
-}
-  }
-
-  // Check that new fields added in newSchema have default values as they 
will not be
-  // present in oldSchema and hence cannot be populated on reading records 
from existing data.
-  for (final Field newSchemaField : newSchema.getFields()) {
-final Field oldSchemaField = 
SchemaCompatibility.lookupWriterField(oldSchema, newSchemaField);
-if (oldSchemaField == null) {
-  if (newSchemaField.defaultVal() == null) {
-// C3: newly added field in newSchema does not have a default value
-return false;
-  }
-}
-  }
-
-  // All fields in the newSchema record can be populated from the 
oldSchema record
-  return true;
-} else {
-  // Use the checks implemented by Avro
-  // newSchema is the schema which will be used to read the records 
written earlier using oldSchema. Hence, in the
-  // check below, use newSchema as the reader schema and oldSchema as the 
wr