yihua commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1006166832


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -412,15 +412,14 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "OPTIMISTIC_CONCURRENCY_CONTROL: Multiple writers can operate on 
the table and exactly one of them succeed "
           + "if a conflict (writes affect the same file group) is detected.");
 
-  /**
-   * Currently the  use this to specify the write schema.
-   */
-  public static final ConfigProperty<String> WRITE_SCHEMA = ConfigProperty
+  public static final ConfigProperty<String> WRITE_SCHEMA_OVERRIDE = 
ConfigProperty
       .key("hoodie.write.schema")
       .noDefaultValue()
-      .withDocumentation("The specified write schema. In most case, we do not 
need set this parameter,"
-          + " but for the case the write schema is not equal to the specified 
table schema, we can"
-          + " specify the write schema by this parameter. Used by 
MergeIntoHoodieTableCommand");
+      .withDocumentation("Config allowing to override writing schema. This 
might be necessary in "

Review Comment:
   `This might be necessary in cases when writing schema derived from the 
incoming dataset diverges is not the schema we actually want to be leveraged 
for writing.`
   
   This sentence does not parse for me.  Does it mean `writing schema ... 
diverges and is not the schema ...`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,20 +79,7 @@
   public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
 
   /**
-   * The specified schema of the table. ("specified" denotes that this is 
configured by the client,
-   * as opposed to being implicitly fetched out of the commit metadata)
-   */
-  protected final Schema tableSchema;
-  protected final Schema tableSchemaWithMetaFields;

Review Comment:
   Wondering if the removal of tableSchema and replacing tableSchema with 
writeSchema can be in an independent PR.  Then we can make this PR smaller.



##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/AvroInternalSchemaConverter.java:
##########
@@ -371,62 +383,69 @@ private static Schema 
visitInternalMapToBuildAvroMap(Types.MapType map, Schema k
    * Converts hudi PrimitiveType to Avro PrimitiveType.
    * this is auxiliary function used by visitInternalSchemaToBuildAvroSchema
    */
-  private static Schema 
visitInternalPrimitiveToBuildAvroPrimitiveType(Type.PrimitiveType primitive) {
-    Schema primitiveSchema;
+  private static Schema 
visitInternalPrimitiveToBuildAvroPrimitiveType(Type.PrimitiveType primitive, 
String recordName) {
     switch (primitive.typeId()) {
       case BOOLEAN:
-        primitiveSchema = Schema.create(Schema.Type.BOOLEAN);
-        break;
+        return Schema.create(Schema.Type.BOOLEAN);
+
       case INT:
-        primitiveSchema = Schema.create(Schema.Type.INT);
-        break;
+        return Schema.create(Schema.Type.INT);
+
       case LONG:
-        primitiveSchema = Schema.create(Schema.Type.LONG);
-        break;
+        return Schema.create(Schema.Type.LONG);
+
       case FLOAT:
-        primitiveSchema = Schema.create(Schema.Type.FLOAT);
-        break;
+        return Schema.create(Schema.Type.FLOAT);
+
       case DOUBLE:
-        primitiveSchema = Schema.create(Schema.Type.DOUBLE);
-        break;
+        return Schema.create(Schema.Type.DOUBLE);
+
       case DATE:
-        primitiveSchema = LogicalTypes.date()
-                .addToSchema(Schema.create(Schema.Type.INT));
-        break;
+        return LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+
       case TIME:
-        primitiveSchema = LogicalTypes.timeMicros()
-                .addToSchema(Schema.create(Schema.Type.LONG));
-        break;
+        return 
LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
+
       case TIMESTAMP:
-        primitiveSchema = LogicalTypes.timestampMicros()
-                .addToSchema(Schema.create(Schema.Type.LONG));
-        break;
+        return 
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+
       case STRING:
-        primitiveSchema = Schema.create(Schema.Type.STRING);
-        break;
-      case UUID:
-        primitiveSchema = LogicalTypes.uuid()
-                .addToSchema(Schema.createFixed("uuid_fixed", null, null, 16));
-        break;
-      case FIXED:
-        Types.FixedType fixed = (Types.FixedType) primitive;
-        primitiveSchema = Schema.createFixed("fixed_" + fixed.getFixedSize(), 
null, null, fixed.getFixedSize());
-        break;
+        return Schema.create(Schema.Type.STRING);
+
       case BINARY:
-        primitiveSchema = Schema.create(Schema.Type.BYTES);
-        break;
-      case DECIMAL:
+        return Schema.create(Schema.Type.BYTES);
+
+      case UUID: {
+        // NOTE: All schemas corresponding to Avro's type [[FIXED]] are 
generated
+        //       with the "fixed" name to stay compatible w/ 
[[SchemaConverters]]
+        String name = recordName + AVRO_NAME_DELIMITER + "fixed";
+        Schema fixedSchema = Schema.createFixed(name, null, null, 16);
+        return LogicalTypes.uuid().addToSchema(fixedSchema);
+      }
+
+      case FIXED: {
+        Types.FixedType fixed = (Types.FixedType) primitive;
+        // NOTE: All schemas corresponding to Avro's type [[FIXED]] are 
generated
+        //       with the "fixed" name to stay compatible w/ 
[[SchemaConverters]]
+        String name = recordName + AVRO_NAME_DELIMITER + "fixed";
+        return Schema.createFixed(name, null, null, fixed.getFixedSize());
+      }
+
+      case DECIMAL: {
         Types.DecimalType decimal = (Types.DecimalType) primitive;
-        primitiveSchema = LogicalTypes.decimal(decimal.precision(), 
decimal.scale())
-                .addToSchema(Schema.createFixed(
-                        "decimal_" + decimal.precision() + "_" + 
decimal.scale(),
-                        null, null, 
computeMinBytesForPrecision(decimal.precision())));
-        break;
+        // NOTE: All schemas corresponding to Avro's type [[FIXED]] are 
generated
+        //       with the "fixed" name to stay compatible w/ 
[[SchemaConverters]]
+        String name = recordName + AVRO_NAME_DELIMITER + "fixed";
+        Schema fixedSchema = Schema.createFixed(name,
+            null, null, computeMinBytesForPrecision(decimal.precision()));
+        return LogicalTypes.decimal(decimal.precision(), decimal.scale())
+            .addToSchema(fixedSchema);
+      }
+      

Review Comment:
   How is this "FIXED" type relevant for UUID and DECIMAL types?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseMergeHelper.java:
##########
@@ -18,91 +18,47 @@
 
 package org.apache.hudi.table.action.commit;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.utils.MergingIterator;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.io.HoodieMergeHandle;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
 
 /**
  * Helper to read records from previous version of base file and run Merge.
  */
-public abstract class BaseMergeHelper<T extends HoodieRecordPayload, I, K, O> {
+public abstract class BaseMergeHelper {
 
   /**
    * Read records from previous version of base file and merge.
    * @param table Hoodie Table
    * @param upsertHandle Merge Handle
    * @throws IOException in case of error
    */
-  public abstract void runMerge(HoodieTable<T, I, K, O> table, 
HoodieMergeHandle<T, I, K, O> upsertHandle) throws IOException;
-
-  protected GenericRecord 
transformRecordBasedOnNewSchema(GenericDatumReader<GenericRecord> gReader, 
GenericDatumWriter<GenericRecord> gWriter,
-                                                               
ThreadLocal<BinaryEncoder> encoderCache, ThreadLocal<BinaryDecoder> 
decoderCache,
-                                                               GenericRecord 
gRec) {
-    ByteArrayOutputStream inStream = null;
-    try {
-      inStream = new ByteArrayOutputStream();
-      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(inStream, 
encoderCache.get());
-      encoderCache.set(encoder);
-      gWriter.write(gRec, encoder);
-      encoder.flush();
-
-      BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(inStream.toByteArray(), decoderCache.get());
-      decoderCache.set(decoder);
-      GenericRecord transformedRec = gReader.read(null, decoder);
-      return transformedRec;
-    } catch (IOException e) {
-      throw new HoodieException(e);
-    } finally {
-      try {
-        inStream.close();
-      } catch (IOException ioe) {
-        throw new HoodieException(ioe.getMessage(), ioe);
-      }
-    }
-  }
+  public abstract void runMerge(HoodieTable<?, ?, ?, ?> table, 
HoodieMergeHandle<?, ?, ?, ?> upsertHandle) throws IOException;
 
   /**
    * Create Parquet record iterator that provides a stitched view of record 
read from skeleton and bootstrap file.
    * Skeleton file is a representation of the bootstrap file inside the table, 
with just the bare bone fields needed
    * for indexing, writing and other functionality.
    *
    */
-  protected Iterator<GenericRecord> getMergingIterator(HoodieTable<T, I, K, O> 
table, HoodieMergeHandle<T, I, K, O> mergeHandle,
-                                                                               
                HoodieBaseFile baseFile, HoodieFileReader<GenericRecord> reader,
-                                                                               
                Schema readSchema, boolean externalSchemaTransformation) throws 
IOException {
-    Path externalFilePath = new 
Path(baseFile.getBootstrapBaseFile().get().getPath());
+  protected Iterator<GenericRecord> getMergingIterator(HoodieTable<?, ?, ?, ?> 
table,
+                                                       HoodieMergeHandle<?, ?, 
?, ?> mergeHandle,
+                                                       Path bootstrapFilePath,

Review Comment:
   What is `externalSchemaTransformation` used for before?



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java:
##########
@@ -378,7 +377,7 @@ protected Iterator<List<WriteStatus>> 
handleUpdateInternal(HoodieMergeHandle<?,
       throw new HoodieUpsertException(
           "Error in finding the old file path at commit " + instantTime + " 
for fileId: " + fileId);
     } else {
-      FlinkMergeHelper.newInstance().runMerge(this, upsertHandle);
+      HoodieMergeHelper.newInstance().runMerge(this, upsertHandle);

Review Comment:
   +1 on removing the unnecessary engine-specific merge helpers.  
@alexeykudinkin could you have this part of the change in a separate PR to 
reduce the LoC of this PR?  I can fast-track the merging.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieMergeHelper.java:
##########
@@ -72,92 +69,113 @@ public static HoodieMergeHelper newInstance() {
   }
 
   @Override
-  public void runMerge(HoodieTable<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> table,
-                       HoodieMergeHandle<T, HoodieData<HoodieRecord<T>>, 
HoodieData<HoodieKey>, HoodieData<WriteStatus>> mergeHandle) throws IOException 
{
-    final boolean externalSchemaTransformation = 
table.getConfig().shouldUseExternalSchemaTransformation();
-    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
+  public void runMerge(HoodieTable<?, ?, ?, ?> table,
+                       HoodieMergeHandle<?, ?, ?, ?> mergeHandle) throws 
IOException {
+    HoodieWriteConfig writeConfig = table.getConfig();
     HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
 
-    final GenericDatumWriter<GenericRecord> gWriter;
-    final GenericDatumReader<GenericRecord> gReader;
-    Schema readSchema;
-    if (externalSchemaTransformation || 
baseFile.getBootstrapBaseFile().isPresent()) {
-      readSchema = 
HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), 
mergeHandle.getOldFilePath()).getSchema();
-      gWriter = new GenericDatumWriter<>(readSchema);
-      gReader = new GenericDatumReader<>(readSchema, 
mergeHandle.getWriterSchemaWithMetaFields());
-    } else {
-      gReader = null;
-      gWriter = null;
-      readSchema = mergeHandle.getWriterSchemaWithMetaFields();
-    }
+    Configuration hadoopConf = new Configuration(table.getHadoopConf());
+    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(hadoopConf, mergeHandle.getOldFilePath());
 
-    BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
-    HoodieFileReader<GenericRecord> reader = 
HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, 
mergeHandle.getOldFilePath());
+    Schema writerSchema = mergeHandle.getWriterSchemaWithMetaFields();
+    Schema readerSchema = reader.getSchema();
 
-    Option<InternalSchema> querySchemaOpt = 
SerDeHelper.fromJson(table.getConfig().getInternalSchema());
-    boolean needToReWriteRecord = false;
-    Map<String, String> renameCols = new HashMap<>();
-    // TODO support bootstrap
-    if (querySchemaOpt.isPresent() && 
!baseFile.getBootstrapBaseFile().isPresent()) {
-      // check implicitly add columns, and position reorder(spark sql may 
change cols order)
-      InternalSchema querySchema = 
AvroSchemaEvolutionUtils.reconcileSchema(readSchema, querySchemaOpt.get());
-      long commitInstantTime = 
Long.valueOf(FSUtils.getCommitTime(mergeHandle.getOldFilePath().getName()));
-      InternalSchema writeInternalSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, 
table.getMetaClient(), table.getConfig().getInternalSchemaCacheEnable());
-      if (writeInternalSchema.isEmptySchema()) {
-        throw new HoodieException(String.format("cannot find file schema for 
current commit %s", commitInstantTime));
-      }
-      List<String> colNamesFromQuerySchema = querySchema.getAllColsFullName();
-      List<String> colNamesFromWriteSchema = 
writeInternalSchema.getAllColsFullName();
-      List<String> sameCols = colNamesFromWriteSchema.stream()
-              .filter(f -> colNamesFromQuerySchema.contains(f)
-                      && writeInternalSchema.findIdByName(f) == 
querySchema.findIdByName(f)
-                      && writeInternalSchema.findIdByName(f) != -1
-                      && 
writeInternalSchema.findType(writeInternalSchema.findIdByName(f)).equals(querySchema.findType(writeInternalSchema.findIdByName(f)))).collect(Collectors.toList());
-      readSchema = AvroInternalSchemaConverter
-          .convert(new InternalSchemaMerger(writeInternalSchema, querySchema, 
true, false, false).mergeSchema(), readSchema.getName());
-      Schema writeSchemaFromFile = 
AvroInternalSchemaConverter.convert(writeInternalSchema, readSchema.getName());
-      needToReWriteRecord = sameCols.size() != colNamesFromWriteSchema.size()
-              || 
SchemaCompatibility.checkReaderWriterCompatibility(readSchema, 
writeSchemaFromFile).getType() == 
org.apache.avro.SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
-      if (needToReWriteRecord) {
-        renameCols = 
InternalSchemaUtils.collectRenameCols(writeInternalSchema, querySchema);
-      }
-    }
+    // Check whether the writer schema is simply a projection of the file's one
+    boolean isProjection = isProjectionOf(readerSchema, writerSchema);
+    // Check whether we will need to rewrite target (already merged) records 
into the
+    // writer's schema
+    boolean shouldRewriteInWriterSchema = 
writeConfig.shouldUseExternalSchemaTransformation()
+        || !isProjection

Review Comment:
   Does this mean that if writer schema has additional fields not present in 
the existing file / record for reading, we need to rewrite the records?  Does 
the GenericRecord already contain the name and value for the new field? 



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala:
##########
@@ -409,6 +409,14 @@ object DataSourceWriteOptions {
 
   val RECONCILE_SCHEMA: ConfigProperty[Boolean] = 
HoodieCommonConfig.RECONCILE_SCHEMA
 
+  val CANONICALIZE_SCHEMA: ConfigProperty[Boolean] = 
ConfigProperty.key("hoodie.datasource.write.schema.canonicalize")
+    .defaultValue(true)
+    .withDocumentation("Controls whether incoming batch's schema's nullability 
constraints should be canonicalized "

Review Comment:
   add `since` version?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,20 +79,7 @@
   public static IgnoreRecord IGNORE_RECORD = new IgnoreRecord();
 
   /**
-   * The specified schema of the table. ("specified" denotes that this is 
configured by the client,
-   * as opposed to being implicitly fetched out of the commit metadata)
-   */
-  protected final Schema tableSchema;
-  protected final Schema tableSchemaWithMetaFields;
-
-  /**
-   * The write schema. In most case the write schema is the same to the
-   * input schema. But if HoodieWriteConfig#WRITE_SCHEMA is specified,
-   * we use the WRITE_SCHEMA as the write schema.
-   *
-   * This is useful for the case of custom HoodieRecordPayload which do some 
conversion
-   * to the incoming record in it. e.g. the ExpressionPayload do the sql 
expression conversion
-   * to the input.
+   * Schema used to write records into data files

Review Comment:
   Based on my understanding of the latest master, the write schema (with or 
without meta fields) is only used for bootstrap base files.  Make sure we have 
unit/functional tests around this, so it is not affected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to