This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 969d5079c9 [core] Support cross-partition for fallback branch feature
(#5198)
969d5079c9 is described below
commit 969d5079c941fafd8be57c9822132a1784f05fb3
Author: Yubin Li <[email protected]>
AuthorDate: Mon Mar 3 17:47:12 2025 +0800
[core] Support cross-partition for fallback branch feature (#5198)
---
.../paimon/crosspartition/IndexBootstrap.java | 4 ++--
.../org/apache/paimon/flink/BranchSqlITCase.java | 21 +++++++++++++++++++++
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index cc64d9549f..ec3ab25ebd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -27,8 +27,8 @@ import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.source.AbstractDataTableScan;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
@@ -84,7 +84,7 @@ public class IndexBootstrap implements Serializable {
.newReadBuilder()
.withProjection(keyProjection);
- AbstractDataTableScan tableScan = (AbstractDataTableScan)
readBuilder.newScan();
+ DataTableScan tableScan = (DataTableScan) readBuilder.newScan();
List<Split> splits =
tableScan
.withBucketFilter(bucket -> bucket % numAssigners ==
assignId)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 18dc023771..7369ee4b2a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -307,6 +307,27 @@ public class BranchSqlITCase extends CatalogITCaseBase {
"+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]",
"+I[wolf, 20]");
}
+ @Test
+ public void testCrossPartitionFallbackBranchBatchRead() throws Exception {
+ sql(
+ "CREATE TABLE t ( pk INT PRIMARY KEY NOT ENFORCED, name
STRING, dt STRING ) PARTITIONED BY (dt) WITH ( 'bucket' = '-1' )");
+ sql(
+ "INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson',
'20250227'), (2, 'Sam', '20250228')");
+ sql("CALL sys.create_branch('default.t', 'stream')");
+ sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'stream' )");
+
+ sql(
+ "INSERT INTO `t$branch_stream` VALUES (1, 'John Stream',
'20250228'), (3, 'Rick Stream', '20250301')");
+ assertThat(collectResult("SELECT pk, name, dt FROM t order by dt"))
+ .containsExactlyInAnyOrder(
+ "+I[1, Jackson, 20250227]",
+ "+I[2, Sam, 20250228]",
+ "+I[3, Rick Stream, 20250301]");
+ assertThat(collectResult("SELECT pk, name, dt FROM `t$branch_stream`
order by dt"))
+ .containsExactlyInAnyOrder(
+ "+I[1, John Stream, 20250228]", "+I[3, Rick Stream,
20250301]");
+ }
+
@Test
public void testDifferentRowTypes() throws Exception {
sql(