This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a4effef50a [core] Not throw exception for fallback reading (#5383)
a4effef50a is described below

commit a4effef50a8ed83db35014b6e453f78a986164b4
Author: Yubin Li <[email protected]>
AuthorDate: Tue Apr 1 17:18:47 2025 +0800

    [core] Not throw exception for fallback reading (#5383)
---
 .../paimon/table/FallbackReadFileStoreTable.java      | 19 ++++++++++++++++---
 .../java/org/apache/paimon/flink/BranchSqlITCase.java |  4 ++--
 2 files changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index d6f073fd2c..dddc8ca439 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.options.Options;
@@ -44,6 +45,9 @@ import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.SegmentsCache;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -60,6 +64,8 @@ import java.util.stream.Collectors;
  */
 public class FallbackReadFileStoreTable extends DelegatedFileStoreTable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(FallbackReadFileStoreTable.class);
+
     private final FileStoreTable fallback;
 
     public FallbackReadFileStoreTable(FileStoreTable wrapped, FileStoreTable 
fallback) {
@@ -397,10 +403,17 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             DataSplit dataSplit = (DataSplit) split;
             if (!dataSplit.dataFiles().isEmpty()
                     && dataSplit.dataFiles().get(0).minKey().getFieldCount() > 
0) {
-                return fallbackRead.createReader(split);
-            } else {
-                return mainRead.createReader(split);
+                try {
+                    return fallbackRead.createReader(split);
+                } catch (Exception ignored) {
+                    LOG.error(
+                            "Reading from fallback branch has problems for 
files: {}",
+                            dataSplit.dataFiles().stream()
+                                    .map(DataFileMeta::fileName)
+                                    .collect(Collectors.joining(", ")));
+                }
             }
+            return mainRead.createReader(split);
         }
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 7369ee4b2a..bc886eb83b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -311,11 +311,11 @@ public class BranchSqlITCase extends CatalogITCaseBase {
     public void testCrossPartitionFallbackBranchBatchRead() throws Exception {
         sql(
                 "CREATE TABLE t ( pk INT PRIMARY KEY NOT ENFORCED, name 
STRING, dt STRING ) PARTITIONED BY (dt) WITH ( 'bucket' = '-1' )");
-        sql(
-                "INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson', 
'20250227'), (2, 'Sam', '20250228')");
         sql("CALL sys.create_branch('default.t', 'stream')");
         sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'stream' )");
 
+        sql(
+                "INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson', 
'20250227'), (2, 'Sam', '20250228')");
         sql(
                 "INSERT INTO `t$branch_stream` VALUES (1, 'John Stream', 
'20250228'), (3, 'Rick Stream', '20250301')");
         assertThat(collectResult("SELECT pk, name, dt FROM t order by dt"))

Reply via email to