zhangyue19921010 commented on code in PR #13365:
URL: https://github.com/apache/hudi/pull/13365#discussion_r2166975089
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala:
##########
@@ -150,14 +149,8 @@ object HoodieDatasetBulkInsertHelper
table: HoodieTable[_, _, _, _],
writeConfig: HoodieWriteConfig,
arePartitionRecordsSorted: Boolean,
- shouldPreserveHoodieMetadata: Boolean,
- operation: WriteOperationType): HoodieData[WriteStatus] = {
- val schema = operation match {
- case WriteOperationType.CLUSTER =>
- alignNotNullFields(dataset.schema, new
Schema.Parser().parse(writeConfig.getSchema))
- case _ =>
- dataset.schema
- }
+ shouldPreserveHoodieMetadata: Boolean):
HoodieData[WriteStatus] = {
+ val schema = AvroConversionUtils.alignFieldsNullability(dataset.schema,
new Schema.Parser().parse(writeConfig.getSchema))
Review Comment:
### Why We Need `alignFieldsNullability`
Currently, Hudi retrieves the Table Schema from three sources in order of
priority:
1. Schema from commit file
2. Schema from table config
3. Schema from data file
```java
private Option<Schema> getTableAvroSchemaInternal(boolean
includeMetadataFields, Option<HoodieInstant> instantOpt) {
Option<Schema> schema =
(instantOpt.isPresent()
? getTableSchemaFromCommitMetadata(instantOpt.get(),
includeMetadataFields)
: getTableSchemaFromLatestCommitMetadata(includeMetadataFields))
.or(() ->
metaClient.getTableConfig().getTableCreateSchema()
.map(tableSchema ->
includeMetadataFields
? HoodieAvroUtils.addMetadataFields(tableSchema,
hasOperationField.get())
: tableSchema)
)
.or(() -> {
Option<Schema> schemaFromDataFile =
getTableAvroSchemaFromDataFileInternal();
return includeMetadataFields
? schemaFromDataFile
: schemaFromDataFile.map(HoodieAvroUtils::removeMetadataFields);
});
// TODO: Append partition columns in all read-paths
if (metaClient.getTableConfig().shouldDropPartitionColumns() &&
schema.isPresent()) {
return metaClient.getTableConfig().getPartitionFields()
.map(partitionFields -> appendPartitionColumns(schema.get(),
Option.ofNullable(partitionFields)))
.or(() -> schema);
}
return schema;
}
```
However, when `using bulk insert as row,` the schema of the generated
Parquet data files may have `inconsistent Optional/Required attributes`
compared to the table schema. This occurs because bulk insert as row directly
uses the DataFrame's schema as the write schema:
```scala
def bulkInsert(dataset: Dataset[Row],
instantTime: String,
table: HoodieTable[_, _, _, _],
writeConfig: HoodieWriteConfig,
arePartitionRecordsSorted: Boolean,
shouldPreserveHoodieMetadata: Boolean,
operation: WriteOperationType): HoodieData[WriteStatus] = {
val schema = operation match {
case WriteOperationType.CLUSTER =>
alignNotNullFields(dataset.schema, new
Schema.Parser().parse(writeConfig.getSchema))
case _ =>
dataset.schema // ← Uses raw DataFrame schema!
}
```
**Generally**, we downplay the distinction between `Optional` and `Required`
in schemas, making it seem like a minor issue. **However**, Binary Copy
performs **pre-validation of schema alignment** – and these nullability
mismatches cause Binary Copy to fail.(Without this validation, the copied files
become unreadable due to deserialization failures.)
So that we introduced `alignFieldsNullability` to achieve two critical
goals:
1. ✅ Enable Binary Copy for **all Hudi write operations** (including `bulk
insert`)
2. ✅ Synchronize **Table Schema** and **Data File Schema** nullability
---
### Experimental Verification
The experiments below demonstrate how `bulk insert as row` creates data
files with **inconsistent Optional/Required attributes** :
Spark bulk insert as row
```
Schema:
message spark_schema {
optional binary _hoodie_commit_time (STRING);
optional binary _hoodie_commit_seqno (STRING);
optional binary _hoodie_record_key (STRING);
optional binary _hoodie_partition_path (STRING);
optional binary _hoodie_file_name (STRING);
required int32 id;
required int32 id2;
required group maptype (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
required group structtype {
optional binary name (STRING);
optional int32 age;
}
required binary arraytype (STRING);
required group newarraytype (LIST) {
repeated group list {
optional int32 element;
}
}
}
```
Spark insert/upsert
```
Schema:
message
hoodie.app_jdr_normal_struct_and_map_a_d_d.app_jdr_normal_struct_and_map_a_d_d_record
{
optional binary _hoodie_commit_time (STRING);
optional binary _hoodie_commit_seqno (STRING);
optional binary _hoodie_record_key (STRING);
optional binary _hoodie_partition_path (STRING);
optional binary _hoodie_file_name (STRING);
required int32 id;
optional int32 id2;
optional group maptype (MAP) {
repeated group key_value (MAP_KEY_VALUE) {
required binary key (STRING);
optional binary value (STRING);
}
}
optional group structtype {
optional binary name (STRING);
optional int32 age;
}
optional binary arraytype (STRING);
optional group newarraytype (LIST) {
repeated int32 array;
}
}
```
Flink insert/upsert
```
Schema:
message flink_schema {
optional binary _hoodie_commit_time (STRING);
optional binary _hoodie_commit_seqno (STRING);
optional binary _hoodie_record_key (STRING);
optional binary _hoodie_partition_path (STRING);
optional binary _hoodie_file_name (STRING);
required int32 id;
optional int32 id2;
optional group maptype (MAP) {
repeated group key_value {
required binary key (STRING);
optional binary value (STRING);
}
}
optional group structtype {
optional binary name (STRING);
optional int32 age;
}
optional binary arraytype (STRING);
optional group newarraytype (LIST) {
repeated group list {
optional int32 element;
}
}
}
```
**Overall**, Spark `insert`/`upsert` and Flink `insert`/`upsert` produce
**consistent schemas**, while **Spark `bulk insert as row`** (and similar
operations like Clustering) exhibit **inconsistent Optional/Required field
attributes**.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]