This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 563f575f42 [spark] Fix read with scan.fallback-branch (#5789)
563f575f42 is described below

commit 563f575f4278e2afbc6eabc39ed4d4bf4e3b40c3
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Jun 23 18:49:48 2025 +0800

    [spark] Fix read with scan.fallback-branch (#5789)
---
 .../scala/org/apache/paimon/spark/ScanHelper.scala |  4 ++-
 .../spark/procedure/BranchProcedureTest.scala      | 35 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index 0d3282b621..c0b35ec0d1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -20,10 +20,11 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.io.DataFileMeta
+import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackDataSplit
 import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{PaimonSparkSession, SparkSession}
+import org.apache.spark.sql.PaimonSparkSession
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
@@ -46,6 +47,7 @@ trait ScanHelper extends Logging {
 
   def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] = {
     val (toReshuffle, reserved) = splits.partition {
+      case _: FallbackDataSplit => false
       case split: DataSplit => split.beforeFiles().isEmpty && 
split.rawConvertible()
       case _ => false
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
index 3af9f60bae..735806b5a6 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/BranchProcedureTest.scala
@@ -145,4 +145,39 @@ class BranchProcedureTest extends PaimonSparkTestBase with 
StreamTest {
       }
     }
   }
+
+  test("Paimon Branch: read with scan.fallback-branch") {
+    withTable("T") {
+      sql("""
+            |CREATE TABLE T (
+            |    dt STRING NOT NULL,
+            |    name STRING NOT NULL,
+            |    amount BIGINT
+            |) PARTITIONED BY (dt)
+            |""".stripMargin)
+
+      sql("ALTER TABLE T SET TBLPROPERTIES ('k1' = 'v1')")
+      sql("ALTER TABLE T SET TBLPROPERTIES ('k2' = 'v2')")
+
+      sql("CALL sys.create_branch('test.T', 'test')")
+      sql("ALTER TABLE T SET TBLPROPERTIES ('scan.fallback-branch' = 'test')")
+
+      sql(
+        "INSERT INTO `T$branch_test` VALUES ('20240725', 'apple', 4), 
('20240725', 'peach', 10), ('20240726', 'cherry', 3), ('20240726', 'pear', 6)")
+      sql("INSERT INTO T VALUES ('20240725', 'apple', 5), ('20240725', 
'banana', 7)")
+
+      checkAnswer(
+        sql("SELECT * FROM T ORDER BY amount"),
+        Seq(
+          Row("20240726", "cherry", 3),
+          Row("20240725", "apple", 5),
+          Row("20240726", "pear", 6),
+          Row("20240725", "banana", 7)))
+
+      sql("ALTER TABLE T UNSET TBLPROPERTIES ('scan.fallback-branch')")
+      checkAnswer(
+        sql("SELECT * FROM T ORDER BY amount"),
+        Seq(Row("20240725", "apple", 5), Row("20240725", "banana", 7)))
+    }
+  }
 }

Reply via email to