This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 969d5079c9 [core] Support cross-partition for fallback branch feature 
(#5198)
969d5079c9 is described below

commit 969d5079c941fafd8be57c9822132a1784f05fb3
Author: Yubin Li <[email protected]>
AuthorDate: Mon Mar 3 17:47:12 2025 +0800

    [core] Support cross-partition for fallback branch feature (#5198)
---
 .../paimon/crosspartition/IndexBootstrap.java       |  4 ++--
 .../org/apache/paimon/flink/BranchSqlITCase.java    | 21 +++++++++++++++++++++
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index cc64d9549f..ec3ab25ebd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -27,8 +27,8 @@ import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.AbstractDataTableScan;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataField;
@@ -84,7 +84,7 @@ public class IndexBootstrap implements Serializable {
                         .newReadBuilder()
                         .withProjection(keyProjection);
 
-        AbstractDataTableScan tableScan = (AbstractDataTableScan) 
readBuilder.newScan();
+        DataTableScan tableScan = (DataTableScan) readBuilder.newScan();
         List<Split> splits =
                 tableScan
                         .withBucketFilter(bucket -> bucket % numAssigners == 
assignId)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 18dc023771..7369ee4b2a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -307,6 +307,27 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                         "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", 
"+I[wolf, 20]");
     }
 
+    @Test
+    public void testCrossPartitionFallbackBranchBatchRead() throws Exception {
+        sql(
+                "CREATE TABLE t ( pk INT PRIMARY KEY NOT ENFORCED, name 
STRING, dt STRING ) PARTITIONED BY (dt) WITH ( 'bucket' = '-1' )");
+        sql(
+                "INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson', 
'20250227'), (2, 'Sam', '20250228')");
+        sql("CALL sys.create_branch('default.t', 'stream')");
+        sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'stream' )");
+
+        sql(
+                "INSERT INTO `t$branch_stream` VALUES (1, 'John Stream', 
'20250228'), (3, 'Rick Stream', '20250301')");
+        assertThat(collectResult("SELECT pk, name, dt FROM t order by dt"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, Jackson, 20250227]",
+                        "+I[2, Sam, 20250228]",
+                        "+I[3, Rick Stream, 20250301]");
+        assertThat(collectResult("SELECT pk, name, dt FROM `t$branch_stream` 
order by dt"))
+                .containsExactlyInAnyOrder(
+                        "+I[1, John Stream, 20250228]", "+I[3, Rick Stream, 
20250301]");
+    }
+
     @Test
     public void testDifferentRowTypes() throws Exception {
         sql(

Reply via email to