This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit cdac700954c4cf312c14cc16ff9d7ce3d09ccda2
Author: Rahil C <[email protected]>
AuthorDate: Sun May 3 09:47:22 2026 -0700

    fix(lance): prevent file splitting for Lance base files to avoid duplicate 
reads (#18678)
---
 .../HoodieFileGroupReaderBasedFileFormat.scala     |  3 ++-
 .../hudi/functional/TestLanceDataSource.scala      | 24 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
index 7e0523ec70a9..3da22ff8ebe7 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala
@@ -220,7 +220,8 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: 
String,
     // This will enable us to take advantage of spark's file splitting 
capability.
     // For overly large single files, we can use multiple concurrent tasks to 
read them, thereby reducing the overall job reading time consumption
     val superSplitable = super.isSplitable(sparkSession, options, path)
-    val splitable = !isMOR && !isIncremental && !isBootstrap && superSplitable
+    val isLance = hoodieFileFormat == HoodieFileFormat.LANCE
+    val splitable = !isMOR && !isIncremental && !isBootstrap && !isLance && 
superSplitable
     logInfo(s"isSplitable: $splitable, super.isSplitable: $superSplitable, 
isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap")
     splitable
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
index 425429caf1cf..1cd5647d9949 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala
@@ -112,6 +112,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase 
{
 
     val actual = readDf.select("id", "name", "age", "score")
 
+    assertEquals(expectedDf.collect().length, actual.collect().length, "Row 
count mismatch - possible duplicates")
     assertTrue(expectedDf.except(actual).isEmpty)
     assertTrue(actual.except(expectedDf).isEmpty)
   }
@@ -1449,6 +1450,29 @@ class TestLanceDataSource extends 
HoodieSparkClientTestBase {
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(value = classOf[HoodieTableType])
+  def testLanceReadNoDuplicateRows(tableType: HoodieTableType): Unit = {
+    val tableName = s"test_lance_no_dup_${tableType.name().toLowerCase}"
+    val tablePath = s"$basePath/$tableName"
+
+    val records = (1 to 100).map(i => (i, s"name_$i", 20 + i, i * 1.1))
+    val inputDf = createDataFrame(records)
+
+    writeDataframe(tableType, tableName, tablePath, inputDf, saveMode = 
SaveMode.Overwrite)
+
+    val readDf = spark.read.format("hudi").load(tablePath)
+    val actual = readDf.select("id", "name", "age", "score")
+    // Use collect().length instead of count() — Spark's count optimization 
pushes down an
+    // empty schema which SparkLanceReaderBase short-circuits to 
Iterator.empty (separate bug).
+    val total = actual.collect().length
+    val distinct = actual.select("id").distinct().count()
+    assertEquals(100, total, "Lance read should not produce duplicate rows")
+    assertEquals(100, distinct, "All record keys should be unique")
+    assertTrue(inputDf.except(actual).isEmpty)
+    assertTrue(actual.except(inputDf).isEmpty)
+  }
+
   private def createDataFrame(records: Seq[(Int, String, Int, Double)]) = {
     spark.createDataFrame(records).toDF("id", "name", "age", 
"score").coalesce(1)
   }

Reply via email to