voonhous commented on code in PR #17833:
URL: https://github.com/apache/hudi/pull/17833#discussion_r2889862976
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -374,4 +377,69 @@ trait SparkAdapter extends Serializable {
* @return A streaming [[DataFrame]]
*/
def createStreamingDataFrame(sqlContext: SQLContext, relation:
HadoopFsRelation, requiredSchema: StructType): DataFrame
+
+ /**
+ * Gets the VariantType DataType if supported by this Spark version.
+ * Spark 3.x returns None (VariantType not supported).
+ * Spark 4.x returns Some(VariantType).
+ *
+ * @return Option[DataType] - Some(VariantType) for Spark 4.x, None for
Spark 3.x
+ */
+ def getVariantDataType: Option[DataType]
+
+ /**
+ * Checks if two data types are equal for Parquet file format purposes.
+ * This handles version-specific types like VariantType (Spark 4.0+).
+ *
+ * Returns Some(true) if types are equal, Some(false) if not equal, or None
if
+ * this adapter doesn't handle this specific type comparison (fallback to
default logic).
+ *
+ * @param requiredType The required/expected data type
+ * @param fileType The data type from the file
+ * @return Option[Boolean] - Some(result) if handled by adapter, None
otherwise
+ */
+ def isDataTypeEqualForParquet(requiredType: DataType, fileType: DataType):
Option[Boolean]
Review Comment:
It is called from `SparkSchemaTransformUtils.isDataTypeEqual` in the schema
evolution code path. It's basically just comparing with the Spark's type.
Will rename to `isDataTypeEqualForPhysicalSchema`.
We can still represent variant variant vals as bytes. So, i don't think an
error needs to be thrown.
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -441,6 +453,38 @@ private static HoodieSchema
visitInternalSchemaToBuildHoodieSchema(Type type, Ma
*/
private static HoodieSchema
visitInternalRecordToBuildHoodieRecord(Types.RecordType recordType,
List<HoodieSchema> fieldSchemas, String recordNameFallback) {
List<Types.Field> fields = recordType.fields();
+
+ // Check if this RecordType is actually a Variant type
+ // Unshredded Variant types are marked by having exactly 2 fields with
negative IDs and specific names
+ if (fields.size() == 2) {
+ Types.Field field0 = fields.get(0);
+ Types.Field field1 = fields.get(1);
+
+ // Check if both fields have negative IDs (system-generated for Variant)
+ boolean hasNegativeIds = field0.fieldId() < 0 && field1.fieldId() < 0;
+
+ // Check if fields are named "value" and "metadata" (order may vary)
+ boolean hasVariantFields = (field0.name().equals("value") &&
field1.name().equals("metadata"))
+ || (field0.name().equals("metadata") &&
field1.name().equals("value"));
+
+ if (hasNegativeIds && hasVariantFields) {
+ // Variant type: Determine if it is shredded or unshredded based on
value field's optionality
+ // TODO (voon): This is incomplete for now, we are only handling
unshredded, fields size of == 2 should always mean this is unshredded
+ String recordName =
Option.ofNullable(recordType.name()).orElse(recordNameFallback);
+ Types.Field valueField = field0.name().equals("value") ? field0 :
field1;
+
+ if (valueField.isOptional()) {
+ // Optional value field indicates shredded variant
+ // Note: We don't have the typed_value schema here, so pass null for
typedValueSchema
+ return HoodieSchema.createVariantShredded(recordName, null, null,
null);
+ } else {
+ // Required value field indicates unshredded variant
+ return HoodieSchema.createVariant(recordName, null, null);
+ }
+ }
+ }
+
+ // Not a Variant, create regular record
Review Comment:
I think we should address this in a separate PR. The more features we add,
the more review cycles we need to go through, review fatigue is a real thing.
I will inline an issue here and add a TODO: #18285
##########
hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala:
##########
@@ -196,4 +198,80 @@ abstract class BaseSpark4Adapter extends SparkAdapter with
Logging {
storageConf.getBoolean(SQLConf.CASE_SENSITIVE.key,
sqlConf.caseSensitiveAnalysis),
getRebaseSpec("CORRECTED"))
}
+
+ override def getVariantDataType: Option[DataType] = {
+ Some(VariantType)
+ }
+
+ override def isDataTypeEqualForParquet(requiredType: DataType, fileType:
DataType): Option[Boolean] = {
+ /**
+ * Checks if a StructType is the physical representation of VariantType in
Parquet.
+ * VariantType is stored in Parquet as a struct with two binary fields:
"value" and "metadata".
+ */
+ def isVariantPhysicalSchema(structType: StructType): Boolean = {
+ if (structType.fields.length != 2) {
+ false
+ } else {
+ val fieldMap = structType.fields.map(f => (f.name, f.dataType)).toMap
+ fieldMap.contains("value") && fieldMap.contains("metadata") &&
+ fieldMap("value") == BinaryType && fieldMap("metadata") == BinaryType
+ }
+ }
+
+ // Handle VariantType comparisons
+ (requiredType, fileType) match {
+ case (_: VariantType, s: StructType) if isVariantPhysicalSchema(s) =>
Some(true)
+ case (s: StructType, _: VariantType) if isVariantPhysicalSchema(s) =>
Some(true)
+ case _ => None // Not a VariantType comparison, use default logic
+ }
+ }
+
+ override def isVariantType(dataType: DataType): Boolean = {
+ dataType.isInstanceOf[VariantType]
+ }
+
+ override def createVariantValueWriter(
+ dataType: DataType,
+ writeValue: Consumer[Array[Byte]],
+ writeMetadata: Consumer[Array[Byte]]
+ ): BiConsumer[SpecializedGetters, Integer] = {
+ if (!isVariantType(dataType)) {
+ throw new IllegalArgumentException(s"Expected VariantType but got
$dataType")
+ }
+
+ (row: SpecializedGetters, ordinal: Integer) => {
+ val variant = row.getVariant(ordinal)
+ writeValue.accept(variant.getValue)
+ writeMetadata.accept(variant.getMetadata)
+ }
+ }
+
+ override def convertVariantFieldToParquetType(
+ dataType: DataType,
+ fieldName: String,
+ fieldSchema: HoodieSchema,
+ repetition: Repetition
+ ): Type = {
+ if (!isVariantType(dataType)) {
+ throw new IllegalArgumentException(s"Expected VariantType but got
$dataType")
+ }
+
+ // Determine if this is a shredded variant
+ val isShredded = fieldSchema match {
+ case variant: HoodieSchema.Variant => variant.isShredded
+ case _ => false
+ }
+
+ // For shredded variants, the value field is OPTIONAL (nullable)
+ // For unshredded variants, the value field is REQUIRED
+ val valueRepetition = if (isShredded) Repetition.OPTIONAL else
Repetition.REQUIRED
+
+ // VariantType is always stored in Parquet as a struct with separate value
and metadata binary fields.
+ // This matches how the HoodieRowParquetWriteSupport writes variant data.
+ // Note: We intentionally omit 'typed_value' for shredded variants as this
writer only accesses raw binary blobs.
+ Types.buildGroup(repetition)
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
valueRepetition).named("value"))
+ .addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Repetition.REQUIRED).named("metadata"))
+ .named(fieldName)
Review Comment:
Yes we should.
I initially wanted to add:
```java
.as(LogicalTypeAnnotation.variantType())
```
But this `#variantType` is not available in parquet 1.15.2.
if memory serves me right, VARIANT logical type annotation was added in
parquet-java `1.16.0`.
##########
hudi-common/src/main/java/org/apache/hudi/internal/schema/convert/InternalSchemaConverter.java:
##########
@@ -346,6 +346,18 @@ private static Type
visitPrimitiveToBuildInternalType(HoodieSchema schema) {
return Types.DateType.get();
case NULL:
return null;
+ case VARIANT:
+ // Variant is represented as a record with value and metadata binary
fields
+ // Convert it to the internal schema representation as a RecordType
+ // Since Variant is treated as a primitive here but needs to be a
record,
+ // we return a RecordType with the appropriate structure
+ List<Types.Field> variantFields = new ArrayList<>(2);
+ // Assign field IDs: these are used for schema evolution tracking
+ // Use negative IDs: indicate these are system-generated for Variant
type
+ // TODO (voon): Check if we can remove the magic numbers?
+ variantFields.add(Types.Field.get(-1, false, "value",
Types.BinaryType.get(), "Variant value component"));
Review Comment:
Done
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibilityChecker.java:
##########
@@ -382,6 +383,8 @@ private SchemaCompatibilityResult
calculateCompatibility(final HoodieSchema read
return result.mergedWith(typeMismatch(reader, writer, locations));
case RECORD:
return result.mergedWith(typeMismatch(reader, writer, locations));
+ case VARIANT:
+ return result.mergedWith(typeMismatch(reader, writer, locations));
Review Comment:
Yeap, as long as reader/writer are variants, they are compatible.
Will add tests
##########
hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java:
##########
@@ -563,7 +563,8 @@ public static HoodieSchema.Variant createVariant(String
name, String namespace,
null
);
- List<HoodieSchemaField> fields = Arrays.asList(metadataField, valueField);
+ // IMPORTANT: Field order must match VariantVal(value, metadata)
constructor
Review Comment:
Uhm, nope, i've made everything consistent across Hudi, Spark and Parquet.
In `BaseSparkAdapter`:
```
Types.buildGroup(repetition)
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
valueRepetition).named("value"))
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Repetition.REQUIRED).named("metadata"))
.named(fieldName)
```
##########
hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/avro/AvroSerializer.scala:
##########
@@ -221,6 +222,32 @@ private[sql] class AvroSerializer(rootCatalystType:
DataType,
java.util.Arrays.asList(result: _*)
}
+ case (VariantType, RECORD) if avroType.getProp("logicalType") ==
"variant" =>
Review Comment:
Okay!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]