vinothchandar commented on code in PR #17833:
URL: https://github.com/apache/hudi/pull/17833#discussion_r2874744455
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -281,6 +282,18 @@ private ValueWriter makeWriter(HoodieSchema schema,
DataType dataType) {
} else if (dataType == DataTypes.BinaryType) {
return (row, ordinal) -> recordConsumer.addBinary(
Binary.fromReusedByteArray(row.getBinary(ordinal)));
+ } else if
(SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
+ // Maps VariantType to a group containing 'metadata' and 'value' fields.
+ // This ensures Spark 4.0 compatibility and supports both Shredded and
Unshredded schemas.
+ // Note: We intentionally omit 'typed_value' for shredded variants as
this writer only accesses raw binary blobs.
+ BiConsumer<SpecializedGetters, Integer> variantWriter =
SparkAdapterSupport$.MODULE$.sparkAdapter().createVariantValueWriter(
+ dataType,
+ valueBytes -> consumeField("value", 0, () ->
recordConsumer.addBinary(Binary.fromConstantByteArray(valueBytes))),
+ metadataBytes -> consumeField("metadata", 1, () ->
recordConsumer.addBinary(Binary.fromConstantByteArray(metadataBytes)))
Review Comment:
Are we sure about the order of value and metadata fields?
https://github.com/apache/parquet-format/blob/37b6e8b863fb510314c07649665251f6474b0c11/VariantEncoding.md#variant-in-parquet
the parquet spec lists metadata first. hence asking.
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -221,6 +222,32 @@ private[sql] class AvroSerializer(rootCatalystType:
DataType,
java.util.Arrays.asList(result: _*)
}
+ case (VariantType, RECORD) if avroType.getProp("logicalType") ==
"variant" =>
Review Comment:
`variant` should be in some constant
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -374,4 +377,69 @@ trait SparkAdapter extends Serializable {
* @return A streaming [[DataFrame]]
*/
def createStreamingDataFrame(sqlContext: SQLContext, relation:
HadoopFsRelation, requiredSchema: StructType): DataFrame
+
+ /**
+ * Gets the VariantType DataType if supported by this Spark version.
+ * Spark 3.x returns None (VariantType not supported).
+ * Spark 4.x returns Some(VariantType).
+ *
+ * @return Option[DataType] - Some(VariantType) for Spark 4.x, None for
Spark 3.x
+ */
+ def getVariantDataType: Option[DataType]
+
+ /**
+ * Checks if two data types are equal for Parquet file format purposes.
+ * This handles version-specific types like VariantType (Spark 4.0+).
+ *
+ * Returns Some(true) if types are equal, Some(false) if not equal, or None
if
+ * this adapter doesn't handle this specific type comparison (fallback to
default logic).
+ *
+ * @param requiredType The required/expected data type
+ * @param fileType The data type from the file
+ * @return Option[Boolean] - Some(result) if handled by adapter, None
otherwise
+ */
+ def isDataTypeEqualForParquet(requiredType: DataType, fileType: DataType):
Option[Boolean]
Review Comment:
+1.. the comparison seems to be at the spark type level. if so - rename the
method?
Separately we should throw an error when variant is introduced without
parquet as base file, since the spec seems to only exist for parquet at this
point? (is that true)
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -346,6 +346,18 @@ private static Type
visitPrimitiveToBuildInternalType(HoodieSchema schema) {
return Types.DateType.get();
case NULL:
return null;
+ case VARIANT:
+ // Variant is represented as a record with value and metadata binary
fields
+ // Convert it to the internal schema representation as a RecordType
+ // Since Variant is treated as a primitive here but needs to be a
record,
+ // we return a RecordType with the appropriate structure
+ List<Types.Field> variantFields = new ArrayList<>(2);
+ // Assign field IDs: these are used for schema evolution tracking
+ // Use negative IDs: indicate these are system-generated for Variant
type
+ // TODO (voon): Check if we can remove the magic numbers?
+ variantFields.add(Types.Field.get(-1, false, "value",
Types.BinaryType.get(), "Variant value component"));
Review Comment:
Define constants: private static final int VARIANT_VALUE_FIELD_ID = -1; and
VARIANT_METADATA_FIELD_ID = -2;?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -517,12 +517,18 @@ class HoodieSparkSqlWriterInternal {
// if table has undergone upgrade, we need to reload table config
tableMetaClient.reloadTableConfig()
tableConfig = tableMetaClient.getTableConfig
- // Convert to RDD[HoodieRecord]
- val hoodieRecords =
Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
+ // Convert to RDD[HoodieRecord] and force type immediately
+ val hoodieRecords: JavaRDD[HoodieRecord[_]] =
Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
writeConfig, parameters, avroRecordName,
avroRecordNamespace, writerSchema, processedDataSchema,
operation, instantTime, preppedSparkSqlWrites,
preppedSparkSqlMergeInto, preppedWriteOperation,
tableConfig))) match {
- case Success(recs) => recs
+ // CAST EXPLANATION:
+ // This cast is required due to Scala/Java generic variance
mismatch.
+ // 1. Java returns JavaRDD[HoodieRecord[_ <: Object]] (wildcard
maps to Object bound)
+ // 2. Scala expects JavaRDD[HoodieRecord[_]] (existential type
maps to Any)
+ // 3. JavaRDD is Invariant in Scala, so these types are not
compatible without a cast.
+ // Please do not remove, even if IDE marks it as redundant.
+ case Success(recs) => recs.asInstanceOf[JavaRDD[HoodieRecord[_]]]
Review Comment:
just a single line comment `// Required: Scala/Java generic variance
mismatch` would suffice?
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -563,7 +563,8 @@ public static HoodieSchema.Variant createVariant(String
name, String namespace,
null
);
- List<HoodieSchemaField> fields = Arrays.asList(metadataField, valueField);
+ // IMPORTANT: Field order must match VariantVal(value, metadata)
constructor
Review Comment:
see comment above.. Does Spark and parquet diverge on what comes first?
##########
hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala:
##########
@@ -196,4 +198,80 @@ abstract class BaseSpark4Adapter extends SparkAdapter with
Logging {
storageConf.getBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis),
getRebaseSpec("CORRECTED"))
}
+
+ override def getVariantDataType: Option[DataType] = {
+ Some(VariantType)
+ }
+
+ override def isDataTypeEqualForParquet(requiredType: DataType, fileType:
DataType): Option[Boolean] = {
+ /**
+ * Checks if a StructType is the physical representation of VariantType in
Parquet.
+ * VariantType is stored in Parquet as a struct with two binary fields:
"value" and "metadata".
+ */
+ def isVariantPhysicalSchema(structType: StructType): Boolean = {
+ if (structType.fields.length != 2) {
+ false
+ } else {
+ val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
+ fieldMap.contains("value") && fieldMap.contains("metadata") &&
+ fieldMap("value") == BinaryType && fieldMap("metadata") == BinaryType
+ }
+ }
+
+ // Handle VariantType comparisons
+ (requiredType, fileType) match {
+ case (_: VariantType, s: StructType) if isVariantPhysicalSchema(s) =>
Some(true)
+ case (s: StructType, _: VariantType) if isVariantPhysicalSchema(s) =>
Some(true)
+ case _ => None // Not a VariantType comparison, use default logic
+ }
+ }
+
+ override def isVariantType(dataType: DataType): Boolean = {
+ dataType.isInstanceOf[VariantType]
+ }
+
+ override def createVariantValueWriter(
+ dataType: DataType,
+ writeValue: Consumer[Array[Byte]],
+ writeMetadata: Consumer[Array[Byte]]
+ ): BiConsumer[SpecializedGetters, Integer] = {
+ if (!isVariantType(dataType)) {
+ throw new IllegalArgumentException(s"Expected VariantType but got
$dataType")
+ }
+
+ (row: SpecializedGetters, ordinal: Integer) => {
+ val variant = row.getVariant(ordinal)
+ writeValue.accept(variant.getValue)
+ writeMetadata.accept(variant.getMetadata)
+ }
+ }
+
+ override def convertVariantFieldToParquetType(
+ dataType: DataType,
+ fieldName: String,
+ fieldSchema: HoodieSchema,
+ repetition: Repetition
+ ): Type = {
+ if (!isVariantType(dataType)) {
+ throw new IllegalArgumentException(s"Expected VariantType but got
$dataType")
+ }
+
+ // Determine if this is a shredded variant
+ val isShredded = fieldSchema match {
+ case variant: HoodieSchema.Variant => variant.isShredded
+ case _ => false
+ }
+
+ // For shredded variants, the value field is OPTIONAL (nullable)
+ // For unshredded variants, the value field is REQUIRED
+ val valueRepetition = if (isShredded) Repetition.OPTIONAL else
Repetition.REQUIRED
+
+ // VariantType is always stored in Parquet as a struct with separate value
and metadata binary fields.
+ // This matches how the HoodieRowParquetWriteSupport writes variant data.
+ // Note: We intentionally omit 'typed_value' for shredded variants as this
writer only accesses raw binary blobs.
+ Types.buildGroup(repetition)
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
valueRepetition).named("value"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Repetition.REQUIRED).named("metadata"))
+ .named(fieldName)
Review Comment:
Do we need to add a parquet logical type annotation per the [Parquet
VariantEncoding
spec](https://github.com/apache/parquet-format/blob/37b6e8b863fb510314c07649665251f6474b0c11/VariantEncoding.md)?
So the newer files have fields correctly distinguished by other parquet
readers?
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java:
##########
@@ -382,6 +383,8 @@ private SchemaCompatibilityResult
calculateCompatibility(final HoodieSchema read
return result.mergedWith(typeMismatch(reader, writer, locations));
case RECORD:
return result.mergedWith(typeMismatch(reader, writer, locations));
+ case VARIANT:
+ return result.mergedWith(typeMismatch(reader, writer, locations));
Review Comment:
@voonhous thoughts?
##########
hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchema.java:
##########
@@ -1179,11 +1179,11 @@ public void
testCreateShreddedVariantWithoutTypedValue() {
// Verify fields should have metadata and nullable value, but no
typed_value
List<HoodieSchemaField> fields = variantSchema.getFields();
assertEquals(2, fields.size());
- assertEquals("metadata", fields.get(0).name());
- assertEquals("value", fields.get(1).name());
+ assertEquals("value", fields.get(0).name());
Review Comment:
ok. this seems to explicitly order them differently. context?
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -441,6 +453,38 @@ private static HoodieSchema
visitInternalSchemaToBuildHoodieSchema(Type type, Ma
*/
private static HoodieSchema
visitInternalRecordToBuildHoodieRecord(Types.RecordType recordType,
List<HoodieSchema> fieldSchemas, String recordNameFallback) {
List<Types.Field> fields = recordType.fields();
+
+ // Check if this RecordType is actually a Variant type
+ // Unshredded Variant types are marked by having exactly 2 fields with
negative IDs and specific names
+ if (fields.size() == 2) {
+ Types.Field field0 = fields.get(0);
+ Types.Field field1 = fields.get(1);
+
+ // Check if both fields have negative IDs (system-generated for Variant)
+ boolean hasNegativeIds = field0.fieldId() < 0 && field1.fieldId() < 0;
+
+ // Check if fields are named "value" and "metadata" (order may vary)
+ boolean hasVariantFields = (field0.name().equals("value") &&
field1.name().equals("metadata"))
+ || (field0.name().equals("metadata") &&
field1.name().equals("value"));
+
+ if (hasNegativeIds && hasVariantFields) {
+ // Variant type: Determine if it is shredded or unshredded based on
value field's optionality
+ // TODO (voon): This is incomplete for now, we are only handling
unshredded, fields size of == 2 should always mean this is unshredded
+ String recordName =
Option.ofNullable(recordType.name()).orElse(recordNameFallback);
+ Types.Field valueField = field0.name().equals("value") ? field0 :
field1;
+
+ if (valueField.isOptional()) {
+ // Optional value field indicates shredded variant
+ // Note: We don't have the typed_value schema here, so pass null for
typedValueSchema
+ return HoodieSchema.createVariantShredded(recordName, null, null,
null);
+ } else {
+ // Required value field indicates unshredded variant
+ return HoodieSchema.createVariant(recordName, null, null);
+ }
+ }
+ }
+
+ // Not a Variant, create regular record
Review Comment:
What kind of evolution are we able to support with Variant columns? Can we
ensure there are tests around.
- Adding a Variant column to an existing table
- Removing a Variant column
- `maxColumnId` calculation when Variant fields (with negative IDs) coexist
with regular fields
- Round-trip: `HoodieSchema → InternalSchema → HoodieSchema` for Variant
types
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -346,6 +346,18 @@ private static Type
visitPrimitiveToBuildInternalType(HoodieSchema schema) {
return Types.DateType.get();
case NULL:
return null;
+ case VARIANT:
+ // Variant is represented as a record with value and metadata binary
fields
+ // Convert it to the internal schema representation as a RecordType
+ // Since Variant is treated as a primitive here but needs to be a
record,
+ // we return a RecordType with the appropriate structure
+ List<Types.Field> variantFields = new ArrayList<>();
+ // Assign field IDs: these are used for schema evolution tracking
+ // Use negative IDs: indicate these are system-generated for Variant
type
+ // TODO (voon): Check if we can remove the magic numbers?
Review Comment:
Can you write out the concerns around schema evolution ? here or in a
separate issue . we can align on a soln there
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -346,6 +346,18 @@ private static Type
visitPrimitiveToBuildInternalType(HoodieSchema schema) {
return Types.DateType.get();
case NULL:
return null;
+ case VARIANT:
+ // Variant is represented as a record with value and metadata binary
fields
+ // Convert it to the internal schema representation as a RecordType
+ // Since Variant is treated as a primitive here but needs to be a
record,
+ // we return a RecordType with the appropriate structure
+ List<Types.Field> variantFields = new ArrayList<>(2);
+ // Assign field IDs: these are used for schema evolution tracking
+ // Use negative IDs: indicate these are system-generated for Variant
type
+ // TODO (voon): Check if we can remove the magic numbers?
+ variantFields.add(Types.Field.get(-1, false, "value",
Types.BinaryType.get(), "Variant value component"));
Review Comment:
For now, can we throw error when variant is used with schema evol turned on?
In any case, can we pull -1, -2 into static final variables we can deference
later, vs hardcoding here deep in the trenches
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]