This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1b26f2b4e2ba fix(variant): align Spark 4.1 MOR merge with
PushVariantIntoScan and restore Spark 4.0 reads (#18674)
1b26f2b4e2ba is described below
commit 1b26f2b4e2ba4b2dac108dc845c90daaf6e830c3
Author: voonhous <[email protected]>
AuthorDate: Fri May 15 05:09:27 2026 +0800
fix(variant): align Spark 4.1 MOR merge with PushVariantIntoScan and
restore Spark 4.0 reads (#18674)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/io/storage/HoodieSparkParquetReader.java | 11 +-
.../SparkFileFormatInternalRowReaderContext.scala | 76 +++++++++++++-
.../sql/avro/HoodieSparkSchemaConverters.scala | 20 +++-
.../parquet/HoodieParquetReadSupport.scala | 48 +--------
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 40 ++++++-
.../parquet/TestHoodieParquetReadSupport.scala | 34 ------
.../hudi/common/engine/HoodieReaderContext.java | 10 ++
.../table/read/buffer/FileGroupRecordBuffer.java | 27 ++++-
.../buffer/PositionBasedFileGroupRecordBuffer.java | 10 +-
.../avro/AvroSchemaConverterWithTimestampNTZ.java | 11 +-
.../HoodieFileGroupReaderBasedFileFormat.scala | 23 ++++-
.../sql/hudi/dml/schema/TestVariantDataType.scala | 10 --
.../spark/sql/adapter/BaseSpark4Adapter.scala | 19 +++-
.../apache/spark/sql/adapter/Spark4_0Adapter.scala | 16 ++-
.../parquet/Spark40HoodieParquetReadSupport.scala | 115 +++++++++++++++++++++
.../Spark40LegacyHoodieParquetFileFormat.scala | 4 +-
.../datasources/parquet/Spark40ParquetReader.scala | 2 +-
.../TestSpark40HoodieParquetReadSupport.scala | 59 +++++++++++
.../apache/spark/sql/adapter/Spark4_1Adapter.scala | 59 ++++++++++-
19 files changed, 474 insertions(+), 120 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index 80ed9be8420e..d72409064a09 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -187,10 +187,13 @@ public class HoodieSparkParquetReader implements
HoodieSparkFileReader {
String.valueOf(storage.getConf().getBoolean(SQLConf.PARQUET_RECORD_FILTER_ENABLED().key(),
sqlConf.parquetRecordFilterEnabled())));
});
}
- ParquetReader<InternalRow> reader = ParquetReader.builder(new
HoodieParquetReadSupport(Option$.MODULE$.empty(), true, true,
- rebaseDateSpec,
-
SparkAdapterSupport$.MODULE$.sparkAdapter().getRebaseSpec("LEGACY"),
messageSchema),
- new Path(path.toUri()))
+ // Via SparkAdapter so Spark 4.0 picks up its variant-reordering
ReadSupport subclass
+ // (#18334); constructing the base class here would MALFORMED_VARIANT on
Spark 4.0.
+ HoodieParquetReadSupport readSupport =
SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetReadSupport(
+ Option$.MODULE$.empty(), true, true,
+ rebaseDateSpec,
+ messageSchema);
+ ParquetReader<InternalRow> reader = ParquetReader.builder(readSupport, new
Path(path.toUri()))
.withConf(storage.getConf().unwrapAs(Configuration.class))
.build();
UnsafeProjection projection = evolution.generateUnsafeProjection();
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 92b963f68390..edfcc44e9aa2 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.{HoodieFileFormat,
HoodieRecord}
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaUtils}
import org.apache.hudi.common.table.HoodieTableConfig
import
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer.ROW_INDEX_TEMPORARY_COLUMN_NAME
+import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.collection.{CachingIterator,
ClosableIterator, Pair => HPair}
import org.apache.hudi.io.storage.{HoodieSparkFileReaderFactory,
HoodieSparkParquetReader, VectorConversionUtils}
@@ -38,11 +39,14 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{JoinedRow, UnsafeProjection}
import org.apache.spark.sql.execution.datasources.{PartitionedFile,
SparkColumnarFileReader}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.hudi.SparkAdapter
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{ArrayType, ByteType, DoubleType, FloatType,
LongType, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
+import java.util.function.{Function => JFunction}
+
import scala.collection.JavaConverters._
/**
@@ -61,14 +65,62 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
filters: Seq[Filter],
requiredFilters: Seq[Filter],
storageConfiguration:
StorageConfiguration[_],
- tableConfig: HoodieTableConfig)
+ tableConfig: HoodieTableConfig,
+ sparkRequiredSchema:
Option[StructType] = None)
extends BaseSparkInternalRowReaderContext(storageConfiguration, tableConfig,
SparkFileFormatInternalRecordContext.apply(tableConfig)) {
+
+ // Java-friendly auxiliary constructor (Scala default args don't generate
matching Java overloads).
+ def this(baseFileReader: SparkColumnarFileReader,
+ filters: Seq[Filter],
+ requiredFilters: Seq[Filter],
+ storageConfiguration: StorageConfiguration[_],
+ tableConfig: HoodieTableConfig) =
+ this(baseFileReader, filters, requiredFilters, storageConfiguration,
tableConfig, None)
+
lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
private lazy val recordKeyFields =
Option(tableConfig.getRecordKeyFields.orElse(null)).map(_.map(_.toLowerCase).toSet).getOrElse(Set.empty)
private lazy val bootstrapSafeFilters: Seq[Filter] =
filters.filter(filterIsSafeForBootstrap) ++ requiredFilters
private lazy val morFilters = filters.filter(filterIsSafeForPrimaryKey(_,
recordKeyFields)) ++ requiredFilters
private lazy val allFilters = filters ++ requiredFilters
+ // For each field of `target`, replace its dataType with the matching
field's projected
+ // variant struct from `source` (when present). Non-matching fields pass
through.
+ private def overlayVariantProjections(target: StructType, source:
StructType): StructType = {
+ StructType(target.fields.map { f =>
+ SparkFileFormatInternalRowReaderContext.findFieldByName(source,
f.name).map(_.dataType) match {
+ case Some(projStruct: StructType) if
sparkAdapter.isVariantProjectionStruct(projStruct) =>
+ f.copy(dataType = projStruct)
+ case _ => f
+ }
+ })
+ }
+
+ // Aligns log-block records with the PushVariantIntoScan-projected variant
shape before
+ // they reach the merger. Preserves merger metadata cols (_hoodie_record_key,
+ // _tmp_metadata_row_index) which the merger reads by ordinal — projecting
down to the
+ // bare required schema would drop them and the merger would read garbage
offsets.
+ override def getLogBlockRecordProjection(
+ dataBlockSchema: HoodieSchema): HOption[JFunction[InternalRow,
InternalRow]] = {
+ val needsProjection = sparkRequiredSchema.exists(_.fields.exists(f =>
f.dataType match {
+ case st: StructType => sparkAdapter.isVariantProjectionStruct(st)
+ case _ => false
+ }))
+ if (!needsProjection) {
+ return HOption.empty[JFunction[InternalRow, InternalRow]]()
+ }
+ val req = sparkRequiredSchema.get
+ val dataStruct = HoodieInternalRowUtils.getCachedSchema(dataBlockSchema)
+ val targetStruct = overlayVariantProjections(dataStruct, req)
+ sparkAdapter.buildVariantProjector(dataStruct, targetStruct) match {
+ case Some(p) => HOption.of(new JFunction[InternalRow, InternalRow] {
+ // .copy() because the buffer stores rows into ExternalSpillableMap and
+ // UnsafeProjection reuses a single output buffer.
+ override def apply(r: InternalRow): InternalRow = p(r).copy()
+ })
+ case None => HOption.empty[JFunction[InternalRow, InternalRow]]()
+ }
+ }
+
override def getFileRecordIterator(filePath: StoragePath,
start: Long,
length: Long,
@@ -79,7 +131,14 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
if (hasRowIndexField) {
assert(getRecordContext.supportsParquetRowIndex())
}
- val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
+ // Use the engine's augmented requiredSchema (includes merger metadata
cols the merger
+ // reads from base rows), but overlay the projected variant shape from
sparkRequiredSchema
+ // so parquet-mr's PushVariantIntoScan kicks in (HoodieSchema collapses
the projected
+ // struct back to VariantType, dropping the VariantMetadata parquet-mr
looks for).
+ val structType = sparkRequiredSchema match {
+ case Some(sparkReq) =>
overlayVariantProjections(HoodieInternalRowUtils.getCachedSchema(requiredSchema),
sparkReq)
+ case None => HoodieInternalRowUtils.getCachedSchema(requiredSchema)
+ }
// Parquet stores VECTOR as FIXED_LEN_BYTE_ARRAY, so the reader needs
BinaryType
// and we decode back to ArrayType below. Lance returns ArrayType
natively, so skip
@@ -100,8 +159,10 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
val (readSchema, readFilters) =
getSchemaAndFiltersForRead(parquetReadStructType, hasRowIndexField)
if (FSUtils.isLogFile(filePath)) {
// NOTE: now only primary key based filtering is supported for log files
+ // Variant alignment happens later via getLogBlockRecordProjection in
the merge buffer.
new HoodieSparkFileReaderFactory(storage).newParquetFileReader(filePath)
-
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema,
readFilters.asJava).asInstanceOf[ClosableIterator[InternalRow]]
+
.asInstanceOf[HoodieSparkParquetReader].getUnsafeRowIterator(requiredSchema,
readFilters.asJava)
+ .asInstanceOf[ClosableIterator[InternalRow]]
} else {
// partition value is empty because the spark parquet reader will append
the partition columns to
// each row if they are given. That is the only usage of the partition
values in the reader.
@@ -274,6 +335,15 @@ class
SparkFileFormatInternalRowReaderContext(baseFileReader: SparkColumnarFileR
}
object SparkFileFormatInternalRowReaderContext {
+ /** Look up a field by name, honoring `spark.sql.caseSensitive`. */
+ private[hudi] def findFieldByName(schema: StructType, name: String):
Option[StructField] = {
+ if (SQLConf.get.caseSensitiveAnalysis) {
+ schema.fields.find(_.name == name)
+ } else {
+ schema.fields.find(_.name.equalsIgnoreCase(name))
+ }
+ }
+
// From "namedExpressions.scala": Used to construct to record position field
metadata.
private val FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY =
"__file_source_generated_metadata_col"
private val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
index f634b773f7ea..c1c3fe1f6837 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkSchemaConverters.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.avro
-import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport}
import org.apache.hudi.common.schema.{HoodieJsonProperties, HoodieSchema,
HoodieSchemaField, HoodieSchemaType}
import org.apache.hudi.common.schema.HoodieSchema.TimePrecision
import org.apache.hudi.internal.schema.HoodieSchemaException
@@ -190,6 +190,12 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
isCanonicalVariantStruct(variantStruct) =>
HoodieSchema.createVariant(recordName, nameSpace, null)
+ // PushVariantIntoScan (Spark 4.1+) rewrites Variant to a struct of
extractions; map
+ // it back to a regular HoodieSchema Variant. parquet-mr does the
projection natively
+ // from the Spark required schema's VariantMetadata.
+ case projectedVariant: StructType if
isSparkVariantProjectionStruct(projectedVariant) =>
+ HoodieSchema.createVariant(recordName, nameSpace, null)
+
case st: StructType =>
val childNameSpace = if (nameSpace != "") s"$nameSpace.$recordName"
else recordName
@@ -514,6 +520,18 @@ object HoodieSparkSchemaConverters extends
SparkAdapterSupport {
}
}
+ /**
+ * Detects a Spark 4.1 PushVariantIntoScan-projected struct. Short-circuits
before consulting
+ * the version-specific SparkAdapter so shared-module unit tests (whose
classpath lacks any
+ * SparkXAdapter) don't trigger an adapter-load failure on plain StructType
conversions.
+ */
+ private def isSparkVariantProjectionStruct(st: StructType): Boolean = {
+ if (!HoodieSparkUtils.gteqSpark4_1) return false
+ if (st.fields.forall(_.metadata == Metadata.empty)) return false
+ try sparkAdapter.isVariantProjectionStruct(st)
+ catch { case _: NoClassDefFoundError | _: ClassNotFoundException => false }
+ }
+
private def sparkTypeForVectorElementType(
elementType:
HoodieSchema.Vector.VectorElementType): DataType = elementType match {
case HoodieSchema.Vector.VectorElementType.FLOAT => FloatType
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
index 5d77bd2fa3d7..c63ec0eef9e4 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetReadSupport.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.util.ValidationUtils
import org.apache.parquet.hadoop.api.InitContext
import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
-import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType,
SchemaRepair, Type, Types}
+import org.apache.parquet.schema.{GroupType, MessageType, SchemaRepair, Type,
Types}
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import java.time.ZoneId
@@ -50,15 +50,7 @@ class HoodieParquetReadSupport(
readContext.getRequestedSchema
}
val trimmedParquetSchema =
HoodieParquetReadSupport.trimParquetSchema(requestedParquetSchema,
context.getFileSchema)
- // TODO: Remove this workaround once Spark is bumped to 4.1+, which reads
variant fields by
- // name via SPARK-54410. Spark 4.0.x's ParquetUnshreddedVariantConverter
builds its converters
- // array in hardcoded [value, metadata] order, then indexes by schema
position. If the Parquet
- // schema has [metadata, value] order (per spec), the positional mismatch
causes
- // MALFORMED_VARIANT. Workaround: reorder variant group fields to [value,
metadata] in the
- // requested schema. parquet-mr reconciles requested vs file schema by
field name, so bytes
- // flow correctly. This is tracked in issue #18334
- val reorderedSchema =
HoodieParquetReadSupport.reorderVariantFields(trimmedParquetSchema)
- new ReadContext(reorderedSchema, readContext.getReadSupportMetadata)
+ new ReadContext(trimmedParquetSchema, readContext.getReadSupportMetadata)
}
}
@@ -83,42 +75,6 @@ object HoodieParquetReadSupport {
Types.buildMessage().addFields(trimmedFields:
_*).named(requestedSchema.getName)
}
- /**
- * Reorders variant group fields in the requested schema so that "value"
precedes "metadata".
- * This works around Spark 4.0.x's ParquetUnshreddedVariantConverter, which
builds its
- * converters array in hardcoded [value, metadata] order and indexes by
schema position.
- * parquet-mr reconciles the requested schema against the file schema by
field name,
- * so the correct bytes still flow to the correct converters regardless of
file order.
- */
- def reorderVariantFields(schema: MessageType): MessageType = {
- val reordered =
schema.getFields.asScala.map(reorderVariantType).toArray[Type]
- Types.buildMessage().addFields(reordered: _*).named(schema.getName)
- }
-
- private def reorderVariantType(t: Type): Type = {
- t match {
- case group: GroupType if isVariantGroup(group) =>
- // Rebuild with [value, metadata] order for Spark compatibility
- val valueField = group.getType("value")
- val metadataField = group.getType("metadata")
- group.withNewFields(java.util.Arrays.asList(valueField, metadataField))
- case group: GroupType =>
- // Recurse into nested groups
- val children = group.getFields.asScala.map(reorderVariantType).asJava
- group.withNewFields(children)
- case _ => t
- }
- }
-
- private def isVariantGroup(group: GroupType): Boolean = {
- group.containsField("value") &&
- group.containsField("metadata") &&
- group.getType("value").isPrimitive &&
- group.getType("metadata").isPrimitive &&
- group.getType("value").asPrimitiveType().getPrimitiveTypeName ==
PrimitiveType.PrimitiveTypeName.BINARY &&
- group.getType("metadata").asPrimitiveType().getPrimitiveTypeName ==
PrimitiveType.PrimitiveTypeName.BINARY
- }
-
private def trimParquetType(requestedType: Type, fileType: Type):
Option[Type] = {
if (requestedType.equals(fileType)) {
Some(requestedType)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index 4eb680e1efc7..2934536f4ec9 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
+import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hadoop.conf.Configuration
@@ -43,7 +44,7 @@ import org.apache.spark.sql.catalyst.util.DateFormatter
import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources._
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetFilters}
+import
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetReadSupport,
ParquetFileFormat, ParquetFilters}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.parser.HoodieExtendedParserInterface
import org.apache.spark.sql.sources.{BaseRelation, Filter}
@@ -242,6 +243,21 @@ trait SparkAdapter extends Serializable {
options: Map[String, String],
hadoopConf: Configuration):
Option[SparkColumnarFileReader]
+ /**
+ * Build the [[HoodieParquetReadSupport]] for a parquet read. Spark 4.0
overrides to return
+ * its variant-aware subclass (variant group field reorder for the
positional converter).
+ * int96 rebase mode is fixed to LEGACY (Hudi convention for timestamp
compatibility).
+ */
+ def createParquetReadSupport(convertTz: Option[java.time.ZoneId],
+ enableVectorizedReader: Boolean,
+ enableTimestampFieldRepair: Boolean,
+ datetimeRebaseSpec: RebaseSpec,
+ tableSchemaOpt: HOption[MessageType])
+ : HoodieParquetReadSupport = {
+ new HoodieParquetReadSupport(convertTz, enableVectorizedReader,
enableTimestampFieldRepair,
+ datetimeRebaseSpec, getRebaseSpec("LEGACY"), tableSchemaOpt)
+ }
+
/**
* use new qe execute
*/
@@ -466,6 +482,28 @@ trait SparkAdapter extends Serializable {
*/
def isVariantShreddingStruct(structType: StructType): Boolean
+ /**
+ * Checks if a StructType is the result of Spark 4.1's PushVariantIntoScan
rewriting — i.e.,
+ * every child field carries `VariantMetadata` describing a pushed-down
variant extraction.
+ *
+ * Returns false on Spark versions earlier than 4.1 (the rewriting only
happens there).
+ */
+ def isVariantProjectionStruct(structType: StructType): Boolean = false
+
+ /**
+ * If `sparkRequiredSchema` contains any field that's a Spark 4.1 variant
projection struct
+ * (i.e., the same-named field in `sparkDataSchema` is `VariantType`),
returns a row
+ * transformer that takes an InternalRow in the data-schema shape (with full
variants) and
+ * produces an InternalRow in the required-schema shape (with each variant
column projected
+ * to its requested struct via VariantGet).
+ *
+ * Used on the MOR log-file path: log records carry the full variant on
disk, but the merger
+ * expects rows aligned to the post-PushVariantIntoScan required schema.
Returns None when
+ * there's nothing to project (cheap fast-path for Spark < 4.1 and for
non-variant queries).
+ */
+ def buildVariantProjector(sparkDataSchema: StructType,
+ sparkRequiredSchema: StructType):
Option[InternalRow => InternalRow] = None
+
/**
* Generates a shredded Variant schema and marks it with write shredding
metadata.
*
diff --git
a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
index 6c2ec5b725cd..973454d1b70c 100644
---
a/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
+++
b/hudi-client/hudi-spark-client/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestHoodieParquetReadSupport.scala
@@ -113,38 +113,4 @@ class TestHoodieParquetReadSupport {
.named("required")
Assertions.assertEquals(expectedSchema, trimmedSchema)
}
-
- /**
- * Validate that reorderVariantFields does not treat groups as variant when
the value/metadata
- * fields fail the type checks in isVariantGroup. Each sub-group exercises a
different false
- * branch of the short-circuit && chain (lines 116-119).
- */
- @Test
- def testReorderVariantFields_nonVariantGroupsUnchanged(): Unit = {
- val schema = Types.buildMessage()
- // value is non-primitive → line 116 false
- .addField(Types.requiredGroup()
-
.addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("value"))
- .addField(Types.required(PrimitiveTypeName.BINARY).named("metadata"))
- .named("g1"))
- // value is primitive, metadata is non-primitive → line 117 false
- .addField(Types.requiredGroup()
- .addField(Types.required(PrimitiveTypeName.BINARY).named("value"))
-
.addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("metadata"))
- .named("g2"))
- // both primitive but non-BINARY → line 118 false
- .addField(Types.requiredGroup()
- .addField(Types.required(PrimitiveTypeName.INT32).named("value"))
- .addField(Types.required(PrimitiveTypeName.INT32).named("metadata"))
- .named("g3"))
- // value is BINARY, metadata is non-BINARY primitive → line 119 false
- .addField(Types.requiredGroup()
- .addField(Types.required(PrimitiveTypeName.BINARY).named("value"))
- .addField(Types.required(PrimitiveTypeName.INT32).named("metadata"))
- .named("g4"))
- .named("test")
-
- val result = HoodieParquetReadSupport.reorderVariantFields(schema)
- Assertions.assertEquals(schema, result)
- }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 393dc8e789f6..953014802cf2 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -56,6 +56,7 @@ import org.apache.hudi.storage.StoragePathInfo;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.function.Function;
import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY;
import static
org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
@@ -383,6 +384,15 @@ public abstract class HoodieReaderContext<T> {
HoodieSchema
dataRequiredSchema,
List<Pair<String,
Object>> requiredPartitionFieldAndValues);
+ /**
+ * Optional per-row transformer applied to log-block records before they
reach the merger.
+ * Engines override this to align records with a projected read schema (e.g.
Spark 4.1's
+ * PushVariantIntoScan). Default is no projection.
+ */
+ public Option<Function<T, T>> getLogBlockRecordProjection(HoodieSchema
dataBlockSchema) {
+ return Option.empty();
+ }
+
public Option<Pair<String, String>> getPayloadClasses(TypedProperties props)
{
return getRecordMerger().map(merger -> {
if
(merger.getMergingStrategy().equals(PAYLOAD_BASED_MERGE_STRATEGY_UUID)) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
index a0eef45b137b..702ecf5041e0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/FileGroupRecordBuffer.java
@@ -199,9 +199,8 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
} else {
blockRecordsIterator =
dataBlock.getEngineRecordIterator(readerContext);
}
- Pair<Function<T, T>, HoodieSchema> schemaTransformerWithEvolvedSchema =
getSchemaTransformerWithEvolvedSchema(dataBlock);
- return Pair.of(new CloseableMappingIterator<>(
- blockRecordsIterator, schemaTransformerWithEvolvedSchema.getLeft()),
schemaTransformerWithEvolvedSchema.getRight());
+ Pair<Function<T, T>, HoodieSchema> projectedTransformer =
getProjectedTransformer(dataBlock);
+ return Pair.of(new CloseableMappingIterator<>(blockRecordsIterator,
projectedTransformer.getLeft()), projectedTransformer.getRight());
} catch (IOException e) {
throw new HoodieIOException("Failed to deser records from log files ",
e);
}
@@ -281,6 +280,28 @@ abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordBuffer<T
return Pair.of(transformer, evolvedSchema);
}
+ /**
+ * Composes schema evolution then the engine's optional log-block record
projection
+ * (currently only Spark 4.1's PushVariantIntoScan). Returns the evolved
data-block schema
+ * — the projector preserves field shape, only rewriting variant fields, so
merger
+ * metadata cols (read by ordinal) stay intact.
+ *
+ * <p>Skipped when a custom payload class is configured: {@code
PayloadUpdateProcessor}
+ * round-trips through {@code convertToAvroRecord} against a schema that
still types
+ * variant fields as {@code VariantType}, which would mis-decode rewritten
rows.
+ */
+ protected Pair<Function<T, T>, HoodieSchema>
getProjectedTransformer(HoodieDataBlock dataBlock) {
+ Pair<Function<T, T>, HoodieSchema> evolved =
getSchemaTransformerWithEvolvedSchema(dataBlock);
+ if (payloadClasses.isPresent()) {
+ return evolved;
+ }
+ Option<Function<T, T>> logProjOpt =
readerContext.getLogBlockRecordProjection(evolved.getRight());
+ if (!logProjOpt.isPresent()) {
+ return evolved;
+ }
+ return Pair.of(evolved.getLeft().andThen(logProjOpt.get()),
evolved.getRight());
+ }
+
private static class LogRecordIterator<T> implements
ClosableIterator<BufferedRecord<T>> {
private final FileGroupRecordBuffer<T> fileGroupRecordBuffer;
private final Iterator<BufferedRecord<T>> logRecordIterator;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
index 18f9acc43511..73dd5dda5e87 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/buffer/PositionBasedFileGroupRecordBuffer.java
@@ -126,9 +126,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
partialUpdateModeOpt);
}
- Pair<Function<T, T>, HoodieSchema> schemaTransformerWithEvolvedSchema =
getSchemaTransformerWithEvolvedSchema(dataBlock);
+ Pair<Function<T, T>, HoodieSchema> projectedTransformer =
getProjectedTransformer(dataBlock);
- HoodieSchema schema =
HoodieSchemaCache.intern(schemaTransformerWithEvolvedSchema.getRight());
+ HoodieSchema schema =
HoodieSchemaCache.intern(projectedTransformer.getRight());
// TODO: Return an iterator that can generate sequence number with the
record.
// Then we can hide this logic into data block.
@@ -144,9 +144,9 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
}
long recordPosition = recordPositions.get(recordIndex++);
- T evolvedNextRecord =
schemaTransformerWithEvolvedSchema.getLeft().apply(nextRecord);
- boolean isDelete =
readerContext.getRecordContext().isDeleteRecord(evolvedNextRecord,
deleteContext);
- BufferedRecord<T> bufferedRecord =
BufferedRecords.fromEngineRecord(evolvedNextRecord, schema,
readerContext.getRecordContext(), orderingFieldNames, isDelete);
+ T projectedNextRecord =
projectedTransformer.getLeft().apply(nextRecord);
+ boolean isDelete =
readerContext.getRecordContext().isDeleteRecord(projectedNextRecord,
deleteContext);
+ BufferedRecord<T> bufferedRecord =
BufferedRecords.fromEngineRecord(projectedNextRecord, schema,
readerContext.getRecordContext(), orderingFieldNames, isDelete);
processNextDataRecord(bufferedRecord, recordPosition);
}
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
index 8895186fd055..c28a9a83c278 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/parquet/avro/AvroSchemaConverterWithTimestampNTZ.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
+import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.schema.ConversionPatterns;
@@ -77,6 +78,7 @@ import static
org.apache.parquet.schema.Type.Repetition.REPEATED;
* This was taken from parquet-java 1.13.1 AvroSchemaConverter and modified
* to support local timestamp types by copying a few methods from 1.14.0
AvroSchemaConverter.
*/
+@Slf4j
@SuppressWarnings("all")
public class AvroSchemaConverterWithTimestampNTZ extends
HoodieAvroParquetSchemaConverter {
@@ -478,7 +480,14 @@ public class AvroSchemaConverterWithTimestampNTZ extends
HoodieAvroParquetSchema
public java.util.Optional<HoodieSchema>
visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
return
java.util.Optional.of(HoodieSchema.create(HoodieSchemaType.STRING));
}
- }).orElseThrow(() -> new UnsupportedOperationException("Cannot convert
Parquet type " + parquetType));
+ }).orElseGet(() -> {
+ // Unrecognized annotation (e.g., parquet 1.16.0+
VariantLogicalTypeAnnotation, not
+ // available on the parquet version we compile against). Fall back
to record
+ // conversion, correct for the variant binary group (`metadata`,
`value`) we write.
+ log.debug("Unrecognized parquet LogicalTypeAnnotation '{}' on group
'{}', falling back to record conversion",
+ logicalTypeAnnotation, parquetGroupType.getName());
+ return convertFields(parquetGroupType.getName(),
parquetGroupType.getFields(), names);
+ });
} else {
// if no original type then it's a record
return convertFields(parquetGroupType.getName(),
parquetGroupType.getFields(), names);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 3da22ff8ebe7..de4ffb400d4c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -151,6 +151,21 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
supportVectorizedRead = false
supportReturningBatch = false
false
+ } else if (schema.fields.exists(f => f.dataType.isInstanceOf[StructType]
+ &&
sparkAdapter.isVariantProjectionStruct(f.dataType.asInstanceOf[StructType]))) {
+ // Spark 4.1's PushVariantIntoScan rewrites a variant column to a struct
of pushed-down
+ // extractions. The Spark vectorized parquet reader treats this as a
nested type change
+ // (data column is VariantType, required is a struct) and refuses to
read in vectorized
+ // mode (ParquetSchemaEvolutionUtils throws). Force row-based reading on
this path.
+ supportVectorizedRead = false
+ supportReturningBatch = false
+ false
+ } else if (HoodieSparkUtils.gteqSpark4_1 && schema.fields.exists(f =>
sparkAdapter.isVariantType(f.dataType))) {
+ // #18605: Spark 4.1's vectorized variant read produces UnsafeRow
encodings that SIGBUS
+ // during RangePartitioner sampling. Force row-based reads. Spark 4.0
unaffected.
+ supportVectorizedRead = false
+ supportReturningBatch = false
+ false
} else {
val conf = sparkSession.sessionState.conf
val parquetBatchSupported =
ParquetUtils.isBatchReadSupportedForSchema(conf, schema) &&
supportBatchWithTableSchema
@@ -173,7 +188,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
}
supportVectorizedRead = !isIncremental && !isBootstrap && supportBatch
supportReturningBatch = !isMOR && supportVectorizedRead
- logInfo(s"supportReturningBatch: $supportReturningBatch,
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, "
+
+ logDebug(s"supportReturningBatch: $supportReturningBatch,
supportVectorizedRead: $supportVectorizedRead, isIncremental: $isIncremental, "
+
s"isBootstrap: $isBootstrap, superSupportBatch: $supportBatch")
supportReturningBatch
}
@@ -222,7 +237,7 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
val superSplitable = super.isSplitable(sparkSession, options, path)
val isLance = hoodieFileFormat == HoodieFileFormat.LANCE
val splitable = !isMOR && !isIncremental && !isBootstrap && !isLance &&
superSplitable
- logInfo(s"isSplitable: $splitable, super.isSplitable: $superSplitable,
isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap")
+ logDebug(s"isSplitable: $splitable, super.isSplitable: $superSplitable,
isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap")
splitable
}
@@ -286,7 +301,9 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath:
String,
.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file))
fileSliceMapping.getSlice(fileGroupName) match {
case Some(fileSlice) if !isCount && (requiredSchema.nonEmpty ||
fileSlice.getLogFiles.findAny().isPresent) =>
- val readerContext = new
SparkFileFormatInternalRowReaderContext(fileGroupBaseFileReader.value, filters,
requiredFilters, storageConf, metaClient.getTableConfig)
+ val readerContext = new SparkFileFormatInternalRowReaderContext(
+ fileGroupBaseFileReader.value, filters, requiredFilters,
storageConf, metaClient.getTableConfig,
+ sparkRequiredSchema = Some(requiredSchema))
readerContext.setEnableLogicalTimestampFieldRepair(storageConf.getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR,
true))
val props = metaClient.getTableConfig.getProps
options.foreach(kv => props.setProperty(kv._1, kv._2))
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
index 56bb3270b5a1..e364772e0596 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/schema/TestVariantDataType.scala
@@ -33,20 +33,10 @@ import
org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
import org.apache.spark.sql.hudi.command.CreateHoodieTableCommand
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
import org.apache.spark.sql.types.{ArrayType, BinaryType, DataType, LongType,
MapType, MetadataBuilder, StringType, StructField, StructType}
-import org.scalatest.{Canceled, Outcome}
class TestVariantDataType extends HoodieSparkSqlTestBase {
- // TODO(#18605): Re-enable after fixing JVM SIGSEGV crash on Spark 4.1
- override def withFixture(test: NoArgTest): Outcome = {
- if (HoodieSparkUtils.gteqSpark4_1) {
- Canceled("Disabled on Spark 4.1 due to JVM SIGSEGV crash in variant data
type tests")
- } else {
- super.withFixture(test)
- }
- }
-
test(s"Test Table with Variant Data Type") {
// Variant type is only supported in Spark 4.0+
assume(HoodieSparkUtils.gteqSpark4_0, "Variant type requires Spark 4.0 or
higher")
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index cca901a7d362..ea6e96943a69 100644
---
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -24,7 +24,7 @@ import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.spark.internal.ReflectUtil
import org.apache.hudi.storage.StorageConfiguration
-import org.apache.parquet.schema.{MessageType, PrimitiveType, Type, Types}
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type,
Types}
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
@@ -198,6 +198,14 @@ abstract class BaseSpark4Adapter extends SparkAdapter with
Logging {
(requiredType, fileType) match {
case (_: VariantType, s: StructType) if isVariantPhysicalSchema(s) =>
Some(true)
case (s: StructType, _: VariantType) if isVariantPhysicalSchema(s) =>
Some(true)
+ // Spark 4.1's PushVariantIntoScan rewrites a `v: VariantType` column
into a
+ // pushed-down projection struct (each child carries `VariantMetadata`).
When the file
+ // stores `v` as a real Variant, the projection struct is NOT a type
change — parquet-mr
+ // reads the variant natively and projects per-row using the field
metadata. Treat the
+ // pair as compatible so Hudi's schema-change machinery doesn't rewrite
the requested
+ // schema back to `VariantType` (which would lose the projection
metadata).
+ case (s: StructType, _: VariantType) if isVariantProjectionStruct(s) =>
Some(true)
+ case (_: VariantType, s: StructType) if isVariantProjectionStruct(s) =>
Some(true)
case _ => None // Not a VariantType comparison, use default logic
}
}
@@ -245,13 +253,16 @@ abstract class BaseSpark4Adapter extends SparkAdapter
with Logging {
// VariantType is always stored in Parquet as a struct with separate value
and metadata binary fields.
// This matches how the HoodieRowParquetWriteSupport writes variant data.
// Note: We intentionally omit 'typed_value' for shredded variants as this
writer only accesses raw binary blobs.
- // TODO: use `.as(LogicalTypeAnnotation.variantType())` after parquet-java
version is bumped to 1.16.0
- Types.buildGroup(repetition)
+ // The variant LogicalTypeAnnotation is applied via
applyVariantLogicalType, Spark 4.0 (parquet 1.15.2)
+ // is a no-op since the annotation only exists in parquet 1.16.0+; Spark
4.1 overrides to apply it.
+ val builder = Types.buildGroup(repetition)
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
Repetition.REQUIRED).named(HoodieSchema.Variant.VARIANT_METADATA_FIELD))
.addField(Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY,
valueRepetition).named(HoodieSchema.Variant.VARIANT_VALUE_FIELD))
- .named(fieldName)
+ applyVariantLogicalType(builder).named(fieldName)
}
+ protected def applyVariantLogicalType(builder:
Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = builder
+
override def isVariantShreddingStruct(structType: StructType): Boolean = {
SparkShreddingUtils.isVariantShreddingStruct(structType)
}
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
index a4eca808af79..7a3d8bec2403 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
@@ -22,8 +22,10 @@ import org.apache.hudi.client.model.{HoodieInternalRow,
Spark40HoodieInternalRow
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
+import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.MessageType
import org.apache.spark.SparkEnv
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
@@ -37,11 +39,12 @@ import
org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.Origin
import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY,
RebaseDateTime}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.lance.SparkLanceReaderBase
import org.apache.spark.sql.execution.datasources.orc.Spark40OrcReader
-import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
Spark40LegacyHoodieParquetFileFormat, Spark40ParquetReader}
+import
org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetReadSupport,
ParquetFileFormat, Spark40HoodieParquetReadSupport,
Spark40LegacyHoodieParquetFileFormat, Spark40ParquetReader}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.hudi.HoodieMemoryStream
@@ -195,6 +198,17 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
Spark40ParquetReader.build(vectorized, sqlConf, options, hadoopConf)
}
+ override def createParquetReadSupport(convertTz: Option[java.time.ZoneId],
+ enableVectorizedReader: Boolean,
+ enableTimestampFieldRepair: Boolean,
+ datetimeRebaseSpec: RebaseSpec,
+ tableSchemaOpt: HOption[MessageType])
+ : HoodieParquetReadSupport = {
+ new Spark40HoodieParquetReadSupport(
+ convertTz, enableVectorizedReader, enableTimestampFieldRepair,
+ datetimeRebaseSpec, getRebaseSpec("LEGACY"), tableSchemaOpt)
+ }
+
/**
* TODO
*
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
new file mode 100644
index 000000000000..02a39037822b
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40HoodieParquetReadSupport.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.hudi.common.util.{Option => HOption}
+
+import org.apache.parquet.hadoop.api.InitContext
+import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type,
Types}
+import org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
+import org.apache.spark.sql.types.{StructType, VariantType}
+
+import java.time.ZoneId
+
+import scala.collection.JavaConverters._
+
+// TODO: Delete this file when the hudi-spark4.0.x module is removed. Spark
4.1+ reads
+// variant fields by name via SPARK-54410, so the reorder workaround below is
no longer
+// needed there. Spark 4.0.x's ParquetUnshreddedVariantConverter builds its
converters
+// array in hardcoded [value, metadata] order, then indexes by schema
position. If the
+// Parquet schema has [metadata, value] order (per spec), the positional
mismatch causes
+// MALFORMED_VARIANT. Workaround: reorder variant group fields to [value,
metadata] in
+// the requested schema. parquet-mr reconciles requested vs file schema by
field name,
+// so bytes flow correctly. Tracked in issue #18334.
+class Spark40HoodieParquetReadSupport(
+ convertTz: Option[ZoneId],
+ enableVectorizedReader: Boolean,
+ enableTimestampFieldRepair: Boolean,
+ datetimeRebaseSpec: RebaseSpec,
+ int96RebaseSpec: RebaseSpec,
+ tableSchemaOpt: HOption[MessageType] =
HOption.empty())
+ extends HoodieParquetReadSupport(
+ convertTz, enableVectorizedReader, enableTimestampFieldRepair,
+ datetimeRebaseSpec, int96RebaseSpec, tableSchemaOpt) {
+
+ override def init(context: InitContext): ReadContext = {
+ val baseContext = super.init(context)
+ // Resolve the Spark catalyst requested schema so the reorder is gated on
+ // VariantType — a user struct that happens to be <value: binary,
metadata: binary>
+ // shouldn't be silently reshuffled.
+ val sparkRequestedSchema = Option(context.getConfiguration.get(
+ ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
+ .map(StructType.fromString)
+ val reorderedSchema = Spark40HoodieParquetReadSupport.reorderVariantFields(
+ baseContext.getRequestedSchema, sparkRequestedSchema)
+ new ReadContext(reorderedSchema, baseContext.getReadSupportMetadata)
+ }
+}
+
+object Spark40HoodieParquetReadSupport {
+ /**
+ * Reorders variant group fields in the requested schema so that "value"
precedes "metadata".
+ * This works around Spark 4.0.x's ParquetUnshreddedVariantConverter, which
builds its
+ * converters array in hardcoded [value, metadata] order and indexes by
schema position.
+ * parquet-mr reconciles the requested schema against the file schema by
field name,
+ * so the correct bytes still flow to the correct converters regardless of
file order.
+ *
+ * When a Spark catalyst schema is supplied, reorder only the top-level
fields that are
+ * actually typed `VariantType` in catalyst; this prevents reshuffling a
user-defined
+ * `struct<value: binary, metadata: binary>` that happens to match the
parquet shape.
+ */
+ def reorderVariantFields(schema: MessageType, sparkSchema:
Option[StructType] = None): MessageType = {
+ val variantFieldNames: Set[String] = sparkSchema match {
+ case Some(s) => s.fields.collect { case f if
f.dataType.isInstanceOf[VariantType] => f.name }.toSet
+ case None => null
+ }
+ val reordered = schema.getFields.asScala.map { f =>
+ if (variantFieldNames == null || variantFieldNames.contains(f.getName)) {
+ reorderVariantType(f)
+ } else f
+ }.toArray[Type]
+ Types.buildMessage().addFields(reordered: _*).named(schema.getName)
+ }
+
+ private def reorderVariantType(t: Type): Type = {
+ t match {
+ case group: GroupType if isVariantGroup(group) =>
+ // Rebuild with [value, metadata] order for Spark compatibility
+ val valueField = group.getType("value")
+ val metadataField = group.getType("metadata")
+ group.withNewFields(java.util.Arrays.asList(valueField, metadataField))
+ case group: GroupType =>
+ // Recurse into nested groups
+ val children = group.getFields.asScala.map(reorderVariantType).asJava
+ group.withNewFields(children)
+ case _ => t
+ }
+ }
+
+ private def isVariantGroup(group: GroupType): Boolean = {
+ group.containsField("value") &&
+ group.containsField("metadata") &&
+ group.getType("value").isPrimitive &&
+ group.getType("metadata").isPrimitive &&
+ group.getType("value").asPrimitiveType().getPrimitiveTypeName ==
PrimitiveType.PrimitiveTypeName.BINARY &&
+ group.getType("metadata").asPrimitiveType().getPrimitiveTypeName ==
PrimitiveType.PrimitiveTypeName.BINARY
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
index 64ff9afdfd67..a4c006c7a7ac 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40LegacyHoodieParquetFileFormat.scala
@@ -329,7 +329,9 @@ class Spark40LegacyHoodieParquetFileFormat(private val
shouldAppendPartitionValu
}
} else {
logDebug(s"Falling back to parquet-mr")
- val readSupport = new HoodieParquetReadSupport(
+ // Spark40 subclass reorders variant group fields to [value, metadata]
for Spark 4.0's
+ // positional variant converter (#18334); base class no longer applies
the reorder.
+ val readSupport = new Spark40HoodieParquetReadSupport(
convertTz,
enableVectorizedReader = false,
enableTimestampFieldRepair = true,
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
index 1516fe870057..1c70642659db 100644
---
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark40ParquetReader.scala
@@ -206,7 +206,7 @@ class Spark40ParquetReader(enableVectorizedReader: Boolean,
}
} else {
// ParquetRecordReader returns InternalRow
- val readSupport = new HoodieParquetReadSupport(
+ val readSupport = new Spark40HoodieParquetReadSupport(
convertTz,
enableVectorizedReader = false,
enableLogicalTimestampRepair,
diff --git
a/hudi-spark-datasource/hudi-spark4.0.x/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSpark40HoodieParquetReadSupport.scala
b/hudi-spark-datasource/hudi-spark4.0.x/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSpark40HoodieParquetReadSupport.scala
new file mode 100644
index 000000000000..9d3c1fa1a109
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark4.0.x/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/TestSpark40HoodieParquetReadSupport.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
+import org.apache.parquet.schema.Types
+import org.junit.jupiter.api.{Assertions, Test}
+
+class TestSpark40HoodieParquetReadSupport {
+
+ /**
+ * Validate that reorderVariantFields does not treat groups as variant when
the value/metadata
+ * fields fail the type checks in isVariantGroup. Each sub-group exercises a
different false
+ * branch of the short-circuit && chain.
+ */
+ @Test
+ def testReorderVariantFieldsNonVariantGroupsUnchanged(): Unit = {
+ val schema = Types.buildMessage()
+ // value is non-primitive
+ .addField(Types.requiredGroup()
+
.addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("value"))
+ .addField(Types.required(PrimitiveTypeName.BINARY).named("metadata"))
+ .named("g1"))
+ // value is primitive, metadata is non-primitive
+ .addField(Types.requiredGroup()
+ .addField(Types.required(PrimitiveTypeName.BINARY).named("value"))
+
.addField(Types.requiredGroup().addField(Types.required(PrimitiveTypeName.INT32).named("x")).named("metadata"))
+ .named("g2"))
+ // both primitive but non-BINARY
+ .addField(Types.requiredGroup()
+ .addField(Types.required(PrimitiveTypeName.INT32).named("value"))
+ .addField(Types.required(PrimitiveTypeName.INT32).named("metadata"))
+ .named("g3"))
+ // value is BINARY, metadata is non-BINARY primitive
+ .addField(Types.requiredGroup()
+ .addField(Types.required(PrimitiveTypeName.BINARY).named("value"))
+ .addField(Types.required(PrimitiveTypeName.INT32).named("metadata"))
+ .named("g4"))
+ .named("test")
+
+ val result = Spark40HoodieParquetReadSupport.reorderVariantFields(schema)
+ Assertions.assertEquals(schema, result)
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
index b910e79747da..195979548bc4 100644
---
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
import org.apache.hadoop.conf.Configuration
+import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, Types}
import org.apache.spark.SparkEnv
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql._
@@ -31,11 +32,13 @@ import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases,
ResolvedTable}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
BoundReference, CreateNamedStruct, Expression, Literal, UnsafeProjection}
+import org.apache.spark.sql.catalyst.expressions.variant.VariantGet
import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{METADATA_COL_ATTR_KEY,
RebaseDateTime}
import org.apache.spark.sql.connector.catalog.{V1Table, V2TableWithV1Fallback}
import org.apache.spark.sql.execution.datasources._
@@ -49,7 +52,7 @@ import org.apache.spark.sql.hudi.analysis.TableValuedFunctions
import org.apache.spark.sql.hudi.blob.{BatchedBlobReaderStrategy,
ScalarFunctions}
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.parser.{HoodieExtendedParserInterface,
HoodieSpark4_1ExtendedSqlParser}
-import org.apache.spark.sql.types.{DataType, DataTypes, Metadata,
MetadataBuilder, StructType}
+import org.apache.spark.sql.types.{DataType, DataTypes, Metadata,
MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatchRow
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel._
@@ -250,6 +253,58 @@ class Spark4_1Adapter extends BaseSpark4Adapter {
RebaseDateTime.RebaseSpec(LegacyBehaviorPolicy.withName(policy))
}
+ override def isVariantProjectionStruct(structType: StructType): Boolean = {
+ VariantMetadata.isVariantStruct(structType)
+ }
+
+ override def buildVariantProjector(sparkDataSchema: StructType,
+ sparkRequiredSchema: StructType):
Option[InternalRow => InternalRow] = {
+ // Quick check: any required field a variant projection struct?
+ if (!sparkRequiredSchema.fields.exists(f =>
VariantMetadata.isVariantStruct(f.dataType))) {
+ None
+ } else {
+ // Surface mismatched schemas with both field lists rather than Spark's
bare
+ // IllegalArgumentException from fieldIndex.
+ def lookupDataField(name: String): (Int, StructField) = {
+ val idx = sparkDataSchema.getFieldIndex(name).getOrElse(
+ throw new IllegalStateException(
+ s"Required field '$name' is absent from sparkDataSchema; " +
+ s"required=${sparkRequiredSchema.fieldNames.mkString("[", ",",
"]")}, " +
+ s"data=${sparkDataSchema.fieldNames.mkString("[", ",", "]")}"))
+ (idx, sparkDataSchema.fields(idx))
+ }
+ val exprs: Array[Expression] = sparkRequiredSchema.fields.map { rf =>
+ rf.dataType match {
+ case projectedStruct: StructType if
VariantMetadata.isVariantStruct(projectedStruct) =>
+ val (dataIdx, dataField) = lookupDataField(rf.name)
+ require(isVariantType(dataField.dataType),
+ s"Expected VariantType for field '${rf.name}' in data schema,
got ${dataField.dataType}")
+ val variantRef: Expression = BoundReference(dataIdx,
dataField.dataType, dataField.nullable)
+ val childExprs: Seq[Expression] =
projectedStruct.fields.toSeq.flatMap { child =>
+ val vm = VariantMetadata.fromMetadata(child.metadata)
+ val pathLit = Literal(UTF8String.fromString(vm.path),
DataTypes.StringType)
+ val tz: Option[String] = Option(vm.timeZoneId)
+ val variantGet: Expression = VariantGet(variantRef, pathLit,
child.dataType, vm.failOnError, tz)
+ Seq(Literal(UTF8String.fromString(child.name),
DataTypes.StringType), variantGet)
+ }
+ CreateNamedStruct(childExprs)
+ case _ =>
+ val (dataIdx, dataField) = lookupDataField(rf.name)
+ BoundReference(dataIdx, dataField.dataType, dataField.nullable)
+ }
+ }
+
+ val projection = UnsafeProjection.create(exprs.toIndexedSeq,
DataTypeUtils.toAttributes(sparkDataSchema))
+ Some(row => projection(row))
+ }
+ }
+
+ // Apply LogicalTypeAnnotation.variantType((byte) 1) to the variant group,
matching parquet 1.16+'s
+ // SparkToParquetSchemaConverter convention.
+ override protected def applyVariantLogicalType(builder:
Types.GroupBuilder[GroupType]): Types.GroupBuilder[GroupType] = {
+ builder.as(LogicalTypeAnnotation.variantType(1.toByte))
+ }
+
override def createMemoryStream[T: Encoder](id: Int, sparkSession:
SparkSession): HoodieMemoryStream[T] = {
// In Spark 4.1, MemoryStream is in
org.apache.spark.sql.execution.streaming.runtime package
// and takes SparkSession directly instead of SQLContext