This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8664282004 [core] fix that branch can't be streamingly read if setting
fallback branch (#5952)
8664282004 is described below
commit 8664282004b75a53e90904033155552faccb7063
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Jul 24 15:28:27 2025 +0800
[core] fix that branch can't be streamingly read if setting fallback branch
(#5952)
---
.../paimon/table/FallbackReadFileStoreTable.java | 23 ++++++++++--------
.../org/apache/paimon/flink/BranchSqlITCase.java | 28 ++++++++++++++++++++++
2 files changed, 41 insertions(+), 10 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index c438b93fe6..f53cfaa985 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -464,18 +464,21 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
- FallbackDataSplit dataSplit = (FallbackDataSplit) split;
- if (dataSplit.isFallback) {
- try {
- return fallbackRead.createReader(dataSplit);
- } catch (Exception ignored) {
- LOG.error(
- "Reading from fallback branch has problems for
files: {}",
- dataSplit.dataFiles().stream()
- .map(DataFileMeta::fileName)
- .collect(Collectors.joining(", ")));
+ if (split instanceof FallbackDataSplit) {
+ FallbackDataSplit fallbackDataSplit = (FallbackDataSplit)
split;
+ if (fallbackDataSplit.isFallback) {
+ try {
+ return fallbackRead.createReader(fallbackDataSplit);
+ } catch (Exception ignored) {
+ LOG.error(
+ "Reading from fallback branch has problems for
files: {}",
+ fallbackDataSplit.dataFiles().stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.joining(", ")));
+ }
}
}
+ DataSplit dataSplit = (DataSplit) split;
return mainRead.createReader(dataSplit);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 5a1933d165..2160cb1a2d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -26,7 +26,9 @@ import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
+import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -354,6 +356,32 @@ public class BranchSqlITCase extends CatalogITCaseBase {
"+I[cat, 20]", "+I[dog, 30]", "+I[lion, 10]",
"+I[wolf, 20]");
}
+ @Test
+ public void testFallbackBranchStreamRead() throws Exception {
+ sql(
+ "CREATE TABLE t ( pt INT NOT NULL, k INT NOT NULL, v STRING )
PARTITIONED BY (pt) WITH ( 'bucket' = '-1' )");
+ sql("INSERT INTO t VALUES (1, 10, 'apple'), (1, 20, 'banana')");
+
+ sql("CALL sys.create_branch('default.t', 'pk')");
+ sql("ALTER TABLE `t$branch_pk` SET ( 'primary-key' = 'pt, k', 'bucket'
= '2' )");
+ sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'pk' )");
+
+ sql("INSERT INTO `t$branch_pk` VALUES (2, 20, 'cat'), (2, 30, 'dog')");
+
+ // read main branch in batch
+ assertThat(collectResult("SELECT v, k FROM t"))
+ .containsExactlyInAnyOrder(
+ "+I[apple, 10]", "+I[banana, 20]", "+I[cat, 20]",
"+I[dog, 30]");
+
+ // read main branch streamingly
+ try (BlockingIterator<Row, Row> iter = streamSqlBlockIter("SELECT *
FROM t")) {
+ AssertionsForInterfaceTypes.assertThat(iter.collect(2))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.INSERT, 1, 10, "apple"),
+ Row.ofKind(RowKind.INSERT, 1, 20, "banana"));
+ }
+ }
+
@Test
public void testCrossPartitionFallbackBranchBatchRead() throws Exception {
sql(