This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a4effef50a [core] Not throw exception for fallback reading (#5383)
a4effef50a is described below
commit a4effef50a8ed83db35014b6e453f78a986164b4
Author: Yubin Li <[email protected]>
AuthorDate: Tue Apr 1 17:18:47 2025 +0800
[core] Not throw exception for fallback reading (#5383)
---
.../paimon/table/FallbackReadFileStoreTable.java | 19 ++++++++++++++++---
.../java/org/apache/paimon/flink/BranchSqlITCase.java | 4 ++--
2 files changed, 18 insertions(+), 5 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index d6f073fd2c..dddc8ca439 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.options.Options;
@@ -44,6 +45,9 @@ import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SegmentsCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -60,6 +64,8 @@ import java.util.stream.Collectors;
*/
public class FallbackReadFileStoreTable extends DelegatedFileStoreTable {
+ private static final Logger LOG =
LoggerFactory.getLogger(FallbackReadFileStoreTable.class);
+
private final FileStoreTable fallback;
public FallbackReadFileStoreTable(FileStoreTable wrapped, FileStoreTable
fallback) {
@@ -397,10 +403,17 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
DataSplit dataSplit = (DataSplit) split;
if (!dataSplit.dataFiles().isEmpty()
&& dataSplit.dataFiles().get(0).minKey().getFieldCount() >
0) {
- return fallbackRead.createReader(split);
- } else {
- return mainRead.createReader(split);
+ try {
+ return fallbackRead.createReader(split);
+ } catch (Exception ignored) {
+ LOG.error(
+ "Reading from fallback branch has problems for
files: {}",
+ dataSplit.dataFiles().stream()
+ .map(DataFileMeta::fileName)
+ .collect(Collectors.joining(", ")));
+ }
}
+ return mainRead.createReader(split);
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 7369ee4b2a..bc886eb83b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -311,11 +311,11 @@ public class BranchSqlITCase extends CatalogITCaseBase {
public void testCrossPartitionFallbackBranchBatchRead() throws Exception {
sql(
"CREATE TABLE t ( pk INT PRIMARY KEY NOT ENFORCED, name
STRING, dt STRING ) PARTITIONED BY (dt) WITH ( 'bucket' = '-1' )");
- sql(
- "INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson',
'20250227'), (2, 'Sam', '20250228')");
sql("CALL sys.create_branch('default.t', 'stream')");
sql("ALTER TABLE t SET ( 'scan.fallback-branch' = 'stream' )");
+ sql(
+ "INSERT INTO t VALUES (1, 'Jack', '20250227'), (1, 'Jackson',
'20250227'), (2, 'Sam', '20250228')");
sql(
"INSERT INTO `t$branch_stream` VALUES (1, 'John Stream',
'20250228'), (3, 'Rick Stream', '20250301')");
assertThat(collectResult("SELECT pk, name, dt FROM t order by dt"))