This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit cdac700954c4cf312c14cc16ff9d7ce3d09ccda2 Author: Rahil C <[email protected]> AuthorDate: Sun May 3 09:47:22 2026 -0700 fix(lance): prevent file splitting for Lance base files to avoid duplicate reads (#18678) --- .../HoodieFileGroupReaderBasedFileFormat.scala | 3 ++- .../hudi/functional/TestLanceDataSource.scala | 24 ++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala index 7e0523ec70a9..3da22ff8ebe7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedFileFormat.scala @@ -220,7 +220,8 @@ class HoodieFileGroupReaderBasedFileFormat(tablePath: String, // This will enable us to take advantage of spark's file splitting capability. // For overly large single files, we can use multiple concurrent tasks to read them, thereby reducing the overall job reading time consumption val superSplitable = super.isSplitable(sparkSession, options, path) - val splitable = !isMOR && !isIncremental && !isBootstrap && superSplitable + val isLance = hoodieFileFormat == HoodieFileFormat.LANCE + val splitable = !isMOR && !isIncremental && !isBootstrap && !isLance && superSplitable logInfo(s"isSplitable: $splitable, super.isSplitable: $superSplitable, isMOR: $isMOR, isIncremental: $isIncremental, isBootstrap: $isBootstrap") splitable } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala index 425429caf1cf..1cd5647d9949 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestLanceDataSource.scala @@ -112,6 +112,7 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { val actual = readDf.select("id", "name", "age", "score") + assertEquals(expectedDf.collect().length, actual.collect().length, "Row count mismatch - possible duplicates") assertTrue(expectedDf.except(actual).isEmpty) assertTrue(actual.except(expectedDf).isEmpty) } @@ -1449,6 +1450,29 @@ class TestLanceDataSource extends HoodieSparkClientTestBase { } } + @ParameterizedTest + @EnumSource(value = classOf[HoodieTableType]) + def testLanceReadNoDuplicateRows(tableType: HoodieTableType): Unit = { + val tableName = s"test_lance_no_dup_${tableType.name().toLowerCase}" + val tablePath = s"$basePath/$tableName" + + val records = (1 to 100).map(i => (i, s"name_$i", 20 + i, i * 1.1)) + val inputDf = createDataFrame(records) + + writeDataframe(tableType, tableName, tablePath, inputDf, saveMode = SaveMode.Overwrite) + + val readDf = spark.read.format("hudi").load(tablePath) + val actual = readDf.select("id", "name", "age", "score") + // Use collect().length instead of count() — Spark's count optimization pushes down an + // empty schema which SparkLanceReaderBase short-circuits to Iterator.empty (separate bug). + val total = actual.collect().length + val distinct = actual.select("id").distinct().count() + assertEquals(100, total, "Lance read should not produce duplicate rows") + assertEquals(100, distinct, "All record keys should be unique") + assertTrue(inputDf.except(actual).isEmpty) + assertTrue(actual.except(inputDf).isEmpty) + } + private def createDataFrame(records: Seq[(Int, String, Int, Double)]) = { spark.createDataFrame(records).toDF("id", "name", "age", "score").coalesce(1) }
