the-other-tim-brown commented on code in PR #18036:
URL: https://github.com/apache/hudi/pull/18036#discussion_r2760897158
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +147,89 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ StructField[] fields = structType.fields();
+ StructField[] shreddedFields = new StructField[fields.length];
+ boolean hasShredding = false;
+
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ DataType dataType = field.dataType();
+
+ // Check if this is a Variant field that should be shredded
+ if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType))
{
Review Comment:
Could we rely directly on the provided HoodieSchema here?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -140,21 +147,119 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, structType,
schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ StructField[] fields = structType.fields();
+ StructField[] shreddedFields = new StructField[fields.length];
+ boolean hasShredding = false;
+
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ DataType dataType = field.dataType();
+
+ // Check if this is a Variant field that should be shredded
+ if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType))
{
+ HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
+ .flatMap(s -> s.getField(field.name()))
+ .map(f -> f.schema())
+ .orElse(null);
+
+ if (fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant variantSchema = (HoodieSchema.Variant)
fieldHoodieSchema;
+ if (variantSchema.isShredded() &&
variantSchema.getTypedValueField().isPresent()) {
Review Comment:
Can we simplify this condition then?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +147,89 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ StructField[] fields = structType.fields();
+ StructField[] shreddedFields = new StructField[fields.length];
+ boolean hasShredding = false;
+
+ for (int i = 0; i < fields.length; i++) {
Review Comment:
This loop only contains top level fields. Should we recursively inspect the
struct fields?
If it is possible to have nested Variant fields, let's make sure we have a
test for it as well.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +147,89 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ StructField[] fields = structType.fields();
+ StructField[] shreddedFields = new StructField[fields.length];
+ boolean hasShredding = false;
+
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ DataType dataType = field.dataType();
+
+ // Check if this is a Variant field that should be shredded
+ if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType))
{
+ HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
Review Comment:
Is it ever possible for the provided schema to be null? Is there a
possibility of it being out of sync with the struct?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +147,89 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
Review Comment:
It would be good to note the behavior when there are no shredded columns,
like "falls back to provided schema if no shredded Variant columns are present"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]