This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8664282004 [core] fix that branch can't be streamingly read if setting 
fallback branch (#5952)
8664282004 is described below

commit 8664282004b75a53e90904033155552faccb7063
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Jul 24 15:28:27 2025 +0800

    [core] fix that branch can't be streamingly read if setting fallback branch 
(#5952)
---
 .../paimon/table/FallbackReadFileStoreTable.java   | 23 ++++++++++--------
 .../org/apache/paimon/flink/BranchSqlITCase.java   | 28 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 10 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index c438b93fe6..f53cfaa985 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -464,18 +464,21 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-            FallbackDataSplit dataSplit = (FallbackDataSplit) split;
-            if (dataSplit.isFallback) {
-                try {
-                    return fallbackRead.createReader(dataSplit);
-                } catch (Exception ignored) {
-                    LOG.error(
-                            "Reading from fallback branch has problems for 
files: {}",
-                            dataSplit.dataFiles().stream()
-                                    .map(DataFileMeta::fileName)
-                                    .collect(Collectors.joining(", ")));
+            if (split instanceof FallbackDataSplit) {
+                FallbackDataSplit fallbackDataSplit = (FallbackDataSplit) 
split;
+                if (fallbackDataSplit.isFallback) {
+                    try {
+                        return fallbackRead.createReader(fallbackDataSplit);
+                    } catch (Exception ignored) {
+                        LOG.error(
+                                "Reading from fallback branch has problems for 
files: {}",
+                                fallbackDataSplit.dataFiles().stream()
+                                        .map(DataFileMeta::fileName)
+                                        .collect(Collectors.joining(", ")));
+                    }
                 }
             }
+            DataSplit dataSplit = (DataSplit) split;
             return mainRead.createReader(dataSplit);
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 5a1933d165..2160cb1a2d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -26,7 +26,9 @@ import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils;
 
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CloseableIterator;
+import org.assertj.core.api.AssertionsForInterfaceTypes;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
@@ -354,6 +356,32 @@ public class BranchSqlITCase extends CatalogITCaseBase {
                         "+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]", 
"+I[wolf, 20]");
     }
 
+    @Test
+    public void testFallbackBranchStreamRead() throws Exception {
+        sql(
+                "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING ) 
PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
+        sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')");
+
+        sql("CALL sys.create_branch('default.t', 'pk')");
+        sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket' 
= '2' )");
+        sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+
+        sql("INSERT INTO `t$branch_pk` VALUES (2, 20, 'cat'), (2, 30, 'dog')");
+
+        // read main branch in batch
+        assertThat(collectResult("SELECT v, k FROM t"))
+                .containsExactlyInAnyOrder(
+                        "+I[apple, 10]", "+I[banana, 20]", "+I[cat, 20]", 
"+I[dog, 30]");
+
+        // read main branch streamingly
+        try (BlockingIterator<Row, Row> iter = streamSqlBlockIter("SELECT * 
FROM t")) {
+            AssertionsForInterfaceTypes.assertThat(iter.collect(2))
+                    .containsExactlyInAnyOrder(
+                            Row.ofKind(RowKind.INSERT, 1, 10, "apple"),
+                            Row.ofKind(RowKind.INSERT, 1, 20, "banana"));
+        }
+    }
+
     @Test
     public void testCrossPartitionFallbackBranchBatchRead() throws Exception {
         sql(

Reply via email to