zhangyue19921010 commented on code in PR #13365:
URL: https://github.com/apache/hudi/pull/13365#discussion_r2166975089


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -150,14 +149,8 @@ object HoodieDatasetBulkInsertHelper
                  table: HoodieTable[_, _, _, _],
                  writeConfig: HoodieWriteConfig,
                  arePartitionRecordsSorted: Boolean,
-                 shouldPreserveHoodieMetadata: Boolean,
-                 operation: WriteOperationType): HoodieData[WriteStatus] = {
-    val schema = operation match {
-      case WriteOperationType.CLUSTER =>
-        alignNotNullFields(dataset.schema, new 
Schema.Parser().parse(writeConfig.getSchema))
-      case _ =>
-        dataset.schema
-    }
+                 shouldPreserveHoodieMetadata: Boolean): 
HoodieData[WriteStatus] = {
+    val schema = AvroConversionUtils.alignFieldsNullability(dataset.schema, 
new Schema.Parser().parse(writeConfig.getSchema))

Review Comment:
   ### Why We Need `alignFieldsNullability`
   
   Currently, Hudi retrieves the Table Schema from three sources in order of 
priority:  
   1. Schema from commit file  
   2. Schema from table config  
   3. Schema from data file  
   
   ```java
   private Option<Schema> getTableAvroSchemaInternal(boolean 
includeMetadataFields, Option<HoodieInstant> instantOpt) {
     Option<Schema> schema =
       (instantOpt.isPresent()
         ? getTableSchemaFromCommitMetadata(instantOpt.get(), 
includeMetadataFields)
         : getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
         .or(() ->
           metaClient.getTableConfig().getTableCreateSchema()
             .map(tableSchema ->
               includeMetadataFields
                 ? HoodieAvroUtils.addMetadataFields(tableSchema, 
hasOperationField.get())
                 : tableSchema)
         )
         .or(() -> {
           Option<Schema> schemaFromDataFile = 
getTableAvroSchemaFromDataFileInternal();
           return includeMetadataFields
             ? schemaFromDataFile
             : schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields);
         });
   
     // TODO: Append partition columns in all read-paths
     if (metaClient.getTableConfig().shouldDropPartitionColumns() && 
schema.isPresent()) {
       return metaClient.getTableConfig().getPartitionFields()
         .map(partitionFields -> appendPartitionColumns(schema.get(), 
Option.ofNullable(partitionFields)))
         .or(() -> schema);
     }
     return schema;
   }
   ```
   
   However, when `using bulk insert as row,` the schema of the generated 
Parquet data files may have `inconsistent Optional/Required attributes` 
compared to the table schema. This occurs because bulk insert as row directly 
uses the DataFrame's schema as the write schema:
   
   ```scala
   def bulkInsert(dataset: Dataset[Row],
                  instantTime: String,
                  table: HoodieTable[_, _, _, _],
                  writeConfig: HoodieWriteConfig,
                  arePartitionRecordsSorted: Boolean,
                  shouldPreserveHoodieMetadata: Boolean,
                  operation: WriteOperationType): HoodieData[WriteStatus] = {
     val schema = operation match {
       case WriteOperationType.CLUSTER =>
         alignNotNullFields(dataset.schema, new 
Schema.Parser().parse(writeConfig.getSchema))
       case _ =>
         dataset.schema // ← Uses raw DataFrame schema!
     }
   ```
   
   **Generally**, we downplay the distinction between `Optional` and `Required` 
in schemas, making it seem like a minor issue. **However**, Binary Copy 
performs **pre-validation of schema alignment** – and these nullability 
mismatches cause Binary Copy to fail.(Without this validation, the copied files 
become unreadable due to deserialization failures.)
   
   So that we introduced `alignFieldsNullability` to achieve two critical 
goals:  
   1. ✅ Enable Binary Copy for **all Hudi write operations** (including `bulk 
insert`)  
   2. ✅ Synchronize **Table Schema** and **Data File Schema** nullability  
   
   ---
   
   ### Experimental Verification  
   The experiments below demonstrate how `bulk insert as row` creates data 
files with **inconsistent Optional/Required attributes** :
   
   Spark bulk insert as row
   ```
   Schema:
   message spark_schema {
     optional binary _hoodie_commit_time (STRING);
     optional binary _hoodie_commit_seqno (STRING);
     optional binary _hoodie_record_key (STRING);
     optional binary _hoodie_partition_path (STRING);
     optional binary _hoodie_file_name (STRING);
     required int32 id;
     required int32 id2;
     required group maptype (MAP) {
       repeated group key_value {
         required binary key (STRING);
         optional binary value (STRING);
       }
     }
     required group structtype {
       optional binary name (STRING);
       optional int32 age;
     }
     required binary arraytype (STRING);
     required group newarraytype (LIST) {
       repeated group list {
         optional int32 element;
       }
     }
   }
   ```
   
   Spark insert/upsert
   ```
   Schema:
   message 
hoodie.app_jdr_normal_struct_and_map_a_d_d.app_jdr_normal_struct_and_map_a_d_d_record
 {
     optional binary _hoodie_commit_time (STRING);
     optional binary _hoodie_commit_seqno (STRING);
     optional binary _hoodie_record_key (STRING);
     optional binary _hoodie_partition_path (STRING);
     optional binary _hoodie_file_name (STRING);
     required int32 id;
     optional int32 id2;
     optional group maptype (MAP) {
       repeated group key_value (MAP_KEY_VALUE) {
         required binary key (STRING);
         optional binary value (STRING);
       }
     }
     optional group structtype {
       optional binary name (STRING);
       optional int32 age;
     }
     optional binary arraytype (STRING);
     optional group newarraytype (LIST) {
       repeated int32 array;
     }
   }
   ```
   
   Flink insert/upsert
   ```
   Schema:
   message flink_schema {
     optional binary _hoodie_commit_time (STRING);
     optional binary _hoodie_commit_seqno (STRING);
     optional binary _hoodie_record_key (STRING);
     optional binary _hoodie_partition_path (STRING);
     optional binary _hoodie_file_name (STRING);
     required int32 id;
     optional int32 id2;
     optional group maptype (MAP) {
       repeated group key_value {
         required binary key (STRING);
         optional binary value (STRING);
       }
     }
     optional group structtype {
       optional binary name (STRING);
       optional int32 age;
     }
     optional binary arraytype (STRING);
     optional group newarraytype (LIST) {
       repeated group list {
         optional int32 element;
       }
     }
   }
   ```
   **Overall**, Spark `insert`/`upsert` and Flink `insert`/`upsert` produce 
**consistent schemas**, while **Spark `bulk insert as row`** (and similar 
operations like Clustering) exhibit **inconsistent Optional/Required field 
attributes**.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to