This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new ea59d79d7467 fix: Fix incremental query with full scan mode on MOR
tables on Databricks Runtime (#18258)
ea59d79d7467 is described below
commit ea59d79d7467e5a13d2605ee05c4f39ea5288f26
Author: Y Ethan Guo <[email protected]>
AuthorDate: Fri Feb 27 12:05:31 2026 -0800
fix: Fix incremental query with full scan mode on MOR tables on Databricks
Runtime (#18258)
---
.../HoodieSpark35PartitionedFileUtils.scala | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieSpark35PartitionedFileUtils.scala
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieSpark35PartitionedFileUtils.scala
index 2c8babe82417..abdfcec88edc 100644
---
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieSpark35PartitionedFileUtils.scala
+++
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieSpark35PartitionedFileUtils.scala
@@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.datasources
+import org.apache.hudi.common.util.ReflectionUtils
import org.apache.hudi.storage.StoragePath
import org.apache.hadoop.fs.FileStatus
import org.apache.spark.paths.SparkPath
@@ -40,11 +41,24 @@ object HoodieSpark35PartitionedFileUtils extends
HoodieSparkPartitionedFileUtils
filePath: StoragePath,
start: Long,
length: Long): PartitionedFile = {
- PartitionedFile(partitionValues, SparkPath.fromUri(filePath.toUri), start,
length)
+ PartitionedFile(partitionValues, SparkPath.fromUri(filePath.toUri), start,
length, Array.empty)
}
override def toFileStatuses(partitionDirs: Seq[PartitionDirectory]):
Seq[FileStatus] = {
- partitionDirs.flatMap(_.files).map(_.fileStatus)
+ val files: Seq[FileStatusWithMetadata] = partitionDirs.flatMap(_.files)
+ try {
+ files.map(_.fileStatus)
+ } catch {
+ case _: NoSuchMethodException | _: NoSuchMethodError | _:
IllegalArgumentException =>
+ val methodOpt =
ReflectionUtils.getMethod(classOf[FileStatusWithMetadata], "toFileStatus")
+ if (methodOpt.isPresent) {
+ val method = methodOpt.get()
+ files.map(f => method.invoke(f).asInstanceOf[FileStatus])
+ } else {
+ throw new RuntimeException(
+ "Cannot find toFileStatus method on FileStatusWithMetadata in
custom Spark Runtime")
+ }
+ }
}
override def newPartitionDirectory(internalRow: InternalRow, statuses:
Seq[FileStatus]): PartitionDirectory = {