This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fde87c7073 [core] ro table support fallback branch (#5544)
fde87c7073 is described below

commit fde87c7073aa0dacc36c73bdbf31d7506991ee78
Author: jerry <[email protected]>
AuthorDate: Tue May 13 11:56:51 2025 +0800

    [core] ro table support fallback branch (#5544)
---
 .../paimon/table/FallbackReadFileStoreTable.java   | 31 +++++++++++++---------
 .../paimon/table/system/ReadOptimizedTable.java    | 10 ++++++-
 .../apache/paimon/flink/CatalogTableITCase.java    | 30 ++++++++++++++++-----
 3 files changed, 51 insertions(+), 20 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 33c7b2c472..07d837bd42 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -76,6 +76,10 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         Preconditions.checkArgument(!(fallback instanceof 
FallbackReadFileStoreTable));
     }
 
+    public FileStoreTable fallback() {
+        return fallback;
+    }
+
     @Override
     public FileStoreTable copy(Map<String, String> dynamicOptions) {
         return new FallbackReadFileStoreTable(
@@ -170,7 +174,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
     @Override
     public DataTableScan newScan() {
         validateSchema();
-        return new Scan();
+        return new FallbackReadScan(wrapped.newScan(), fallback.newScan());
     }
 
     private void validateSchema() {
@@ -229,46 +233,47 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         return true;
     }
 
-    private class Scan implements DataTableScan {
+    /** Scan implementation for {@link FallbackReadFileStoreTable}. */
+    public static class FallbackReadScan implements DataTableScan {
 
         private final DataTableScan mainScan;
         private final DataTableScan fallbackScan;
 
-        private Scan() {
-            this.mainScan = wrapped.newScan();
-            this.fallbackScan = fallback.newScan();
+        public FallbackReadScan(DataTableScan mainScan, DataTableScan 
fallbackScan) {
+            this.mainScan = mainScan;
+            this.fallbackScan = fallbackScan;
         }
 
         @Override
-        public Scan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
+        public FallbackReadScan withShard(int indexOfThisSubtask, int 
numberOfParallelSubtasks) {
             mainScan.withShard(indexOfThisSubtask, numberOfParallelSubtasks);
             fallbackScan.withShard(indexOfThisSubtask, 
numberOfParallelSubtasks);
             return this;
         }
 
         @Override
-        public Scan withFilter(Predicate predicate) {
+        public FallbackReadScan withFilter(Predicate predicate) {
             mainScan.withFilter(predicate);
             fallbackScan.withFilter(predicate);
             return this;
         }
 
         @Override
-        public Scan withLimit(int limit) {
+        public FallbackReadScan withLimit(int limit) {
             mainScan.withLimit(limit);
             fallbackScan.withLimit(limit);
             return this;
         }
 
         @Override
-        public Scan withPartitionFilter(Map<String, String> partitionSpec) {
+        public FallbackReadScan withPartitionFilter(Map<String, String> 
partitionSpec) {
             mainScan.withPartitionFilter(partitionSpec);
             fallbackScan.withPartitionFilter(partitionSpec);
             return this;
         }
 
         @Override
-        public Scan withPartitionFilter(List<BinaryRow> partitions) {
+        public FallbackReadScan withPartitionFilter(List<BinaryRow> 
partitions) {
             mainScan.withPartitionFilter(partitions);
             fallbackScan.withPartitionFilter(partitions);
             return this;
@@ -282,21 +287,21 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         }
 
         @Override
-        public Scan withBucketFilter(Filter<Integer> bucketFilter) {
+        public FallbackReadScan withBucketFilter(Filter<Integer> bucketFilter) 
{
             mainScan.withBucketFilter(bucketFilter);
             fallbackScan.withBucketFilter(bucketFilter);
             return this;
         }
 
         @Override
-        public Scan withLevelFilter(Filter<Integer> levelFilter) {
+        public FallbackReadScan withLevelFilter(Filter<Integer> levelFilter) {
             mainScan.withLevelFilter(levelFilter);
             fallbackScan.withLevelFilter(levelFilter);
             return this;
         }
 
         @Override
-        public Scan withMetricRegistry(MetricRegistry metricRegistry) {
+        public FallbackReadScan withMetricRegistry(MetricRegistry 
metricRegistry) {
             mainScan.withMetricRegistry(metricRegistry);
             fallbackScan.withMetricRegistry(metricRegistry);
             return this;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index caa0b133cc..2e6e5beb06 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -28,10 +28,12 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.DataTable;
+import org.apache.paimon.table.FallbackReadFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.DataTableBatchScan;
+import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.DataTableStreamScan;
 import org.apache.paimon.table.source.InnerTableRead;
 import org.apache.paimon.table.source.StreamDataTableScan;
@@ -130,7 +132,13 @@ public class ReadOptimizedTable implements DataTable, 
ReadonlyTable {
     }
 
     @Override
-    public DataTableBatchScan newScan() {
+    public DataTableScan newScan() {
+        if (wrapped instanceof FallbackReadFileStoreTable) {
+            FallbackReadFileStoreTable table = (FallbackReadFileStoreTable) 
wrapped;
+            return (new FallbackReadFileStoreTable.FallbackReadScan(
+                            table.wrapped().newScan(), 
table.fallback().newScan()))
+                    .withLevelFilter(l -> l == coreOptions().numLevels() - 1);
+        }
         return new DataTableBatchScan(
                 wrapped.schema(),
                 coreOptions(),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 0797322c0d..a9395a0a83 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -1097,11 +1097,25 @@ public class CatalogTableITCase extends 
CatalogITCaseBase {
     @Test
     public void testReadOptimizedTable() {
         sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH 
('bucket' = '1')");
-        innerTestReadOptimizedTable();
+        innerTestReadOptimizedTableAndCheckData("T");
 
         sql("DROP TABLE T");
         sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH 
('bucket' = '-1')");
-        innerTestReadOptimizedTable();
+        innerTestReadOptimizedTableAndCheckData("T");
+    }
+
+    @Test
+    public void testReadOptimizedTableFallBack() {
+        sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH 
('bucket' = '1')");
+        sql("CALL sys.create_branch('default.T', 'stream')");
+        sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')");
+        innerTestReadOptimizedTableAndCheckData("T$branch_stream");
+
+        sql("DROP TABLE T");
+        sql("CREATE TABLE T (k INT, v INT, PRIMARY KEY (k) NOT ENFORCED) WITH 
('bucket' = '-1')");
+        sql("CALL sys.create_branch('default.T', 'stream')");
+        sql("ALTER TABLE T SET ('scan.fallback-branch' = 'stream')");
+        innerTestReadOptimizedTableAndCheckData("T$branch_stream");
     }
 
     @Test
@@ -1173,21 +1187,25 @@ public class CatalogTableITCase extends 
CatalogITCaseBase {
         assertThat(row.getField(6)).isNotNull();
     }
 
-    private void innerTestReadOptimizedTable() {
+    private void innerTestReadOptimizedTableAndCheckData(String 
insertTableName) {
         // full compaction will always be performed at the end of batch jobs, 
as long as
         // full-compaction.delta-commits is set, regardless of its value
         sql(
-                "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' = 
'100') */ VALUES (1, 10), (2, 20)");
+                String.format(
+                        "INSERT INTO %s /*+ 
OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (1, 10), (2, 20)",
+                        insertTableName));
         List<Row> result = sql("SELECT k, v FROM T$ro ORDER BY k");
         assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20));
 
         // no compaction, so result of ro table does not change
-        sql("INSERT INTO T VALUES (1, 11), (3, 30)");
+        sql(String.format("INSERT INTO %s VALUES (1, 11), (3, 30)", 
insertTableName));
         result = sql("SELECT k, v FROM T$ro ORDER BY k");
         assertThat(result).containsExactly(Row.of(1, 10), Row.of(2, 20));
 
         sql(
-                "INSERT INTO T /*+ OPTIONS('full-compaction.delta-commits' = 
'100') */ VALUES (2, 21), (3, 31)");
+                String.format(
+                        "INSERT INTO %s /*+ 
OPTIONS('full-compaction.delta-commits' = '100') */ VALUES (2, 21), (3, 31)",
+                        insertTableName));
         result = sql("SELECT k, v FROM T$ro ORDER BY k");
         assertThat(result).containsExactly(Row.of(1, 11), Row.of(2, 21), 
Row.of(3, 31));
     }

Reply via email to