This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 563f575f42 [spark] Fix read with scan.fallback-branch (#5789)
563f575f42 is described below
commit 563f575f4278e2afbc6eabc39ed4d4bf4e3b40c3
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Jun 23 18:49:48 2025 +0800
[spark] Fix read with scan.fallback-branch (#5789)
---
.../scala/org/apache/paimon/spark/ScanHelper.scala | 4 ++-
.../spark/procedure/BranchProcedureTest.scala | 35 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 0d3282b621..c0b35ec0d1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -20,10 +20,11 @@ package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
import org.apache.paimon.io.DataFileMeta
+import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackDataSplit
import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{PaimonSparkSession, SparkSession}
+import org.apache.spark.sql.PaimonSparkSession
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
@@ -46,6 +47,7 @@ trait ScanHelper extends Logging {
def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] = {
val (toReshuffle, reserved) = splits.partition {
+ case _: FallbackDataSplit => false
case split: DataSplit => split.beforeFiles().isEmpty &&
split.rawConvertible()
case _ => false
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
index 3af9f60bae..735806b5a6 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
@@ -145,4 +145,39 @@ class BranchProcedureTest extends PaimonSparkTestBase with
StreamTest {
}
}
}
+
+ test("Paimon Branch: read with scan.fallback-branch") {
+ withTable("T") {
+ sql("""
+ |CREATE TABLE T (
+ | dt STRING NOT NULL,
+ | name STRING NOT NULL,
+ | amount BIGINT
+ |) PARTITIONED BY (dt)
+ |""".stripMargin)
+
+ sql("ALTER TABLE T SET TBLPROPERTIES ('k1' = 'v1')")
+ sql("ALTER TABLE T SET TBLPROPERTIES ('k2' = 'v2')")
+
+ sql("CALL sys.create_branch('test.T', 'test')")
+ sql("ALTER TABLE T SET TBLPROPERTIES ('scan.fallback-branch' = 'test')")
+
+ sql(
+ "INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4),
('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6)")
+ sql("INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725',
'banana', 7)")
+
+ checkAnswer(
+ sql("SELECT * FROM T ORDER BY amount"),
+ Seq(
+ Row("20240726", "cherry", 3),
+ Row("20240725", "apple", 5),
+ Row("20240726", "pear", 6),
+ Row("20240725", "banana", 7)))
+
+ sql("ALTER TABLE T UNSET TBLPROPERTIES ('scan.fallback-branch')")
+ checkAnswer(
+ sql("SELECT * FROM T ORDER BY amount"),
+ Seq(Row("20240725", "apple", 5), Row("20240725", "banana", 7)))
+ }
+ }
}