This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a9df02e272 [core] Support chain tbl on batch mode (#6394)
a9df02e272 is described below

commit a9df02e2720e9b21fa83b4d262461e4711392b17
Author: Stefanietry <[email protected]>
AuthorDate: Wed Dec 24 22:17:55 2025 +0800

    [core] Support chain tbl on batch mode (#6394)
---
 docs/content/primary-key-table/chain-table.md      | 148 +++++++
 .../shortcodes/generated/core_configuration.html   |  18 +
 docs/static/img/chain-table.png                    | Bin 0 -> 205667 bytes
 .../main/java/org/apache/paimon/CoreOptions.java   |  34 ++
 .../paimon/io/ChainKeyValueFileReaderFactory.java  |  13 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  34 ++
 .../apache/paimon/table/ChainGroupReadTable.java   | 356 +++++++++++++++++
 .../paimon/table/FallbackReadFileStoreTable.java   |  34 +-
 .../apache/paimon/table/FileStoreTableFactory.java |  64 +++-
 .../paimon/table/source/AbstractDataTableScan.java |   6 +
 .../apache/paimon/table/source/InnerTableScan.java |   4 +
 .../org/apache/paimon/utils/ChainTableUtils.java   | 207 ++++++++++
 .../apache/paimon/utils/ChainTableUtilsTest.java   | 192 ++++++++++
 .../paimon/spark/SparkCatalogWithHiveTest.java     | 424 +++++++++++++++++++++
 14 files changed, 1524 insertions(+), 10 deletions(-)

diff --git a/docs/content/primary-key-table/chain-table.md 
b/docs/content/primary-key-table/chain-table.md
new file mode 100644
index 0000000000..84c1f298a3
--- /dev/null
+++ b/docs/content/primary-key-table/chain-table.md
@@ -0,0 +1,148 @@
+---
+title: "Chain Table"
+weight: 6
+type: docs
+aliases:
+- /primary-key-table/chain-table.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Chain Table
+
+Chain table is a new capability for primary key tables that transforms how you 
process incremental data.
+Imagine a scenario where you periodically store a full snapshot of data (for 
example, once a day), even 
+though only a small portion changes between snapshots. ODS binlog dump is a 
typical example of this pattern.
+
+Taking a daily binlog dump job as an example. A batch job merges yesterday’s 
full dataset with today’s 
+incremental changes to produce a new full dataset. This approach has two clear 
drawbacks:
+* Full computation: Merge operation includes all data, and it will involve 
shuffle, which results in poor performance.
+* Full storage: Store a full set of data every day, and the changed data 
usually accounts for a very small proportion.
+
+Paimon addresses this problem by directly consuming only the changed data and 
performing merge-on-read. 
+In this way, full computation and storage are turned into incremental mode:
+* Incremental computation: The offline ETL daily job only needs to consume the 
changed data of the current day and do not require merging all data.
+* Incremental Storage: Only store the changed data each day, and 
asynchronously compact it periodically (e.g., weekly) to build a global chain 
table within the lifecycle.
+  {{< img src="/img/chain-table.png">}}
+
+Based on the regular table, chain table introduces snapshot and delta branches 
to represent full and incremental 
+data respectively. When writing, you specify the branch to write full or 
incremental data. When reading, paimon 
+automatically chooses the appropriate strategy based on the read mode, such as 
full, incremental, or hybrid.
+
+To enable chain table, you must config `chain-table.enabled` to true in the 
table options when creating the
+table, and the snapshot and delta branch need to be created as well. Consider 
an example via Spark SQL:
+
+```sql
+CREATE TABLE default.t (
+    `t1` string ,
+    `t2` string ,
+    `t3` string
+) PARTITIONED BY (`date` string)
+TBLPROPERTIES (
+  'chain-table.enabled' = 'true',
+  -- props about primary key table  
+  'primary-key' = 'date,t1',
+  'sequence.field' = 't2',
+  'bucket-key' = 't1',
+  'bucket' = '2',
+  -- props about partition
+  'partition.timestamp-pattern' = '$date', 
+  'partition.timestamp-formatter' = 'yyyyMMdd'
+);
+
+CALL sys.create_branch('default.t', 'snapshot');
+
+CALL sys.create_branch('default.t', 'delta');
+
+ALTER TABLE default.t SET tblproperties 
+    ('scan.fallback-snapshot-branch' = 'snapshot', 
+     'scan.fallback-delta-branch' = 'delta');
+ 
+ALTER TABLE `default`.`t$branch_snapshot` SET tblproperties
+    ('scan.fallback-snapshot-branch' = 'snapshot',
+     'scan.fallback-delta-branch' = 'delta');
+
+ALTER TABLE `default`.`t$branch_delta` SET tblproperties 
+    ('scan.fallback-snapshot-branch' = 'snapshot',
+     'scan.fallback-delta-branch' = 'delta');
+```
+
+Notice that:
+- Chain table is only supported for primary key table, which means you should 
define `bucket` and `bucket-key` for the table.
+- Chain table should ensure that the schema of each branch is consistent.
+- Only spark support now, flink will be supported later.
+- Chain compact is not supported for now, and it will be supported later.
+
+After creating a chain table, you can read and write data in the following 
ways.
+
+- Full Write: Write data to t$branch_snapshot.
+```sql
+insert overwrite `default`.`t$branch_snapshot` partition (date = '20250810') 
+    values ('1', '1', '1'); 
+```
+
+- Incremental Write: Write data to t$branch_delta.
+```sql
+insert overwrite `default`.`t$branch_delta` partition (date = '20250811') 
+    values ('2', '1', '1');
+```
+
+- Full Query: If the snapshot branch has full partition, read it directly; 
otherwise, read on chain merge mode.
+```sql
+select t1, t2, t3 from default.t where date = '20250811'
+```
+you will get the following result:
+```text
++---+----+-----+ 
+| t1|  t2|   t3| 
++---+----+-----+ 
+| 1 |   1|   1 |           
+| 2 |   1|   1 |               
++---+----+-----+ 
+```
+
+- Incremental Query: Read the incremental partition from t$branch_delta
+```sql
+select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
+```
+you will get the following result:
+```text
++---+----+-----+ 
+| t1|  t2|   t3| 
++---+----+-----+      
+| 2 |   1|   1 |               
++---+----+-----+ 
+```
+
+- Hybrid Query: Read both full and incremental data simultaneously.
+```sql
+select t1, t2, t3 from default.t where date = '20250811'
+union all
+select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
+```
+you will get the following result:
+```text
++---+----+-----+ 
+| t1|  t2|   t3| 
++---+----+-----+ 
+| 1 |   1|   1 |           
+| 2 |   1|   1 |  
+| 2 |   1|   1 |               
++---+----+-----+ 
+```
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 269625f718..027d798c06 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -98,6 +98,12 @@ under the License.
             <td>MemorySize</td>
             <td>Memory page size for caching.</td>
         </tr>
+        <tr>
+            <td><h5>chain-table.enabled</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether enabled chain table.</td>
+        </tr>
         <tr>
             <td><h5>changelog-file.compression</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -1061,6 +1067,18 @@ This config option does not affect the default 
filesystem metastore.</td>
             <td>String</td>
             <td>When a batch job queries from a table, if a partition does not 
exist in the current branch, the reader will try to get this partition from 
this fallback branch.</td>
         </tr>
+        <tr>
+            <td><h5>scan.fallback-delta-branch</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>When a batch job queries from a chain table, if a partition 
does not exist in either main or snapshot branch, the reader will try to get 
this partition from chain snapshot and delta branch together.</td>
+        </tr>
+        <tr>
+            <td><h5>scan.fallback-snapshot-branch</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>When a batch job queries from a chain table, if a partition 
does not exist in the main branch, the reader will try to get this partition 
from chain snapshot branch.</td>
+        </tr>
         <tr>
             <td><h5>scan.file-creation-time-millis</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/static/img/chain-table.png b/docs/static/img/chain-table.png
new file mode 100644
index 0000000000..cfd79ea095
Binary files /dev/null and b/docs/static/img/chain-table.png differ
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ae48355836..d5bd5789e8 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -226,6 +226,28 @@ public class CoreOptions implements Serializable {
     public static final ConfigOption<String> BRANCH =
             
key("branch").stringType().defaultValue("main").withDescription("Specify branch 
name.");
 
+    public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED =
+            key("chain-table.enabled")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether enabled chain table.");
+
+    public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
+            key("scan.fallback-snapshot-branch")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When a batch job queries from a chain table, if a 
partition does not exist in the main branch, "
+                                    + "the reader will try to get this 
partition from chain snapshot branch.");
+
+    public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH =
+            key("scan.fallback-delta-branch")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When a batch job queries from a chain table, if a 
partition does not exist in either main or snapshot branch, "
+                                    + "the reader will try to get this 
partition from chain snapshot and delta branch together.");
+
     public static final String FILE_FORMAT_ORC = "orc";
     public static final String FILE_FORMAT_AVRO = "avro";
     public static final String FILE_FORMAT_PARQUET = "parquet";
@@ -3254,6 +3276,18 @@ public class CoreOptions implements Serializable {
         return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
     }
 
+    public boolean isChainTable() {
+        return options.get(CHAIN_TABLE_ENABLED);
+    }
+
+    public String scanFallbackSnapshotBranch() {
+        return options.get(SCAN_FALLBACK_SNAPSHOT_BRANCH);
+    }
+
+    public String scanFallbackDeltaBranch() {
+        return options.get(SCAN_FALLBACK_DELTA_BRANCH);
+    }
+
     public boolean formatTableImplementationIsPaimon() {
         return options.get(FORMAT_TABLE_IMPLEMENTATION) == 
FormatTableImplementation.PAIMON;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
index 0fa9ae73ef..2c4d419989 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
@@ -70,8 +70,19 @@ public class ChainKeyValueFileReaderFactory extends 
KeyValueFileReaderFactory {
         this.chainReadContext = chainReadContext;
         CoreOptions options = new CoreOptions(schema.options());
         this.currentBranch = options.branch();
+        String snapshotBranch = options.scanFallbackSnapshotBranch();
+        String deltaBranch = options.scanFallbackDeltaBranch();
+        SchemaManager snapshotSchemaManager =
+                snapshotBranch.equalsIgnoreCase(currentBranch)
+                        ? schemaManager
+                        : schemaManager.copyWithBranch(snapshotBranch);
+        SchemaManager deltaSchemaManager =
+                deltaBranch.equalsIgnoreCase(currentBranch)
+                        ? schemaManager
+                        : schemaManager.copyWithBranch(deltaBranch);
         this.branchSchemaManagers = new HashMap<>();
-        this.branchSchemaManagers.put(currentBranch, schemaManager);
+        this.branchSchemaManagers.put(snapshotBranch, snapshotSchemaManager);
+        this.branchSchemaManagers.put(deltaBranch, deltaSchemaManager);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 56176f2b83..abbc2e4912 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -21,6 +21,7 @@ package org.apache.paimon.schema;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.TableType;
 import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
@@ -39,6 +40,7 @@ import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
 import java.util.ArrayList;
@@ -241,6 +243,8 @@ public class SchemaValidation {
         validateRowTracking(schema, options);
 
         validateIncrementalClustering(schema, options);
+
+        validateChainTable(schema, options);
     }
 
     public static void validateFallbackBranch(SchemaManager schemaManager, 
TableSchema schema) {
@@ -679,4 +683,34 @@ public class SchemaValidation {
                     PRIMARY_KEY.key());
         }
     }
+
+    public static void validateChainTable(TableSchema schema, CoreOptions 
options) {
+        if (options.isChainTable()) {
+            boolean isPrimaryTbl = schema.primaryKeys() != null && 
!schema.primaryKeys().isEmpty();
+            boolean isPartitionTbl =
+                    schema.partitionKeys() != null && 
!schema.partitionKeys().isEmpty();
+            ChangelogProducer changelogProducer = options.changelogProducer();
+            Preconditions.checkArgument(
+                    options.type() == TableType.TABLE, "Chain table must be 
table type.");
+            Preconditions.checkArgument(isPrimaryTbl, "Primary key is required 
for chain table.");
+            Preconditions.checkArgument(isPartitionTbl, "Chain table must be 
partition table.");
+            Preconditions.checkArgument(
+                    options.bucket() > 0, "Bucket number must be greater than 
0 for chain table.");
+            Preconditions.checkArgument(
+                    options.sequenceField() != null, "Sequence field is 
required for chain table.");
+            Preconditions.checkArgument(
+                    options.mergeEngine() == MergeEngine.DEDUPLICATE,
+                    "Merge engine must be deduplicate for chain table.");
+            Preconditions.checkArgument(
+                    changelogProducer == ChangelogProducer.NONE
+                            || changelogProducer == ChangelogProducer.INPUT,
+                    "Changelog producer must be none or input for chain 
table.");
+            Preconditions.checkArgument(
+                    options.partitionTimestampPattern() != null,
+                    "Partition timestamp pattern is required for chain 
table.");
+            Preconditions.checkArgument(
+                    options.partitionTimestampFormatter() != null,
+                    "Partition timestamp formatter is required for chain 
table.");
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
new file mode 100644
index 0000000000..aa583f7e6a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.ChainSplit;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Chain table which mainly read from the snapshot branch. However, if the 
snapshot branch does not
+ * have a partition, it will fall back to chain read.
+ */
+public class ChainGroupReadTable extends FallbackReadFileStoreTable {
+
+    public ChainGroupReadTable(
+            AbstractFileStoreTable snapshotStoreTable, AbstractFileStoreTable 
deltaStoreTable) {
+        super(snapshotStoreTable, deltaStoreTable);
+        Preconditions.checkArgument(snapshotStoreTable instanceof 
PrimaryKeyFileStoreTable);
+        Preconditions.checkArgument(deltaStoreTable instanceof 
PrimaryKeyFileStoreTable);
+    }
+
+    @Override
+    public DataTableScan newScan() {
+        super.validateSchema();
+        return new ChainTableBatchScan(
+                wrapped.newScan(),
+                fallback().newScan(),
+                ((AbstractFileStoreTable) wrapped).tableSchema,
+                this);
+    }
+
+    private DataTableScan newSnapshotScan() {
+        return wrapped.newScan();
+    }
+
+    private DataTableScan newDeltaScan() {
+        return fallback().newScan();
+    }
+
+    @Override
+    public FileStoreTable copy(Map<String, String> dynamicOptions) {
+        return new ChainGroupReadTable(
+                (AbstractFileStoreTable) wrapped.copy(dynamicOptions),
+                (AbstractFileStoreTable) 
fallback().copy(rewriteFallbackOptions(dynamicOptions)));
+    }
+
+    @Override
+    public FileStoreTable copy(TableSchema newTableSchema) {
+        return new ChainGroupReadTable(
+                (AbstractFileStoreTable) wrapped.copy(newTableSchema),
+                (AbstractFileStoreTable)
+                        fallback()
+                                .copy(
+                                        newTableSchema.copy(
+                                                
rewriteFallbackOptions(newTableSchema.options()))));
+    }
+
+    @Override
+    public FileStoreTable copyWithoutTimeTravel(Map<String, String> 
dynamicOptions) {
+        return new ChainGroupReadTable(
+                (AbstractFileStoreTable) 
wrapped.copyWithoutTimeTravel(dynamicOptions),
+                (AbstractFileStoreTable)
+                        
fallback().copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions)));
+    }
+
+    @Override
+    public FileStoreTable copyWithLatestSchema() {
+        return new ChainGroupReadTable(
+                (AbstractFileStoreTable) wrapped.copyWithLatestSchema(),
+                (AbstractFileStoreTable) fallback().copyWithLatestSchema());
+    }
+
+    @Override
+    public FileStoreTable switchToBranch(String branchName) {
+        return new ChainGroupReadTable(
+                (AbstractFileStoreTable) switchWrappedToBranch(branchName),
+                (AbstractFileStoreTable) fallback());
+    }
+
+    /** Scan implementation for {@link ChainGroupReadTable}. */
+    public static class ChainTableBatchScan extends FallbackReadScan {
+
+        private final RowDataToObjectArrayConverter partitionConverter;
+        private final InternalRowPartitionComputer partitionComputer;
+        private final TableSchema tableSchema;
+        private final CoreOptions options;
+        private final RecordComparator partitionComparator;
+        private final ChainGroupReadTable chainGroupReadTable;
+
+        public ChainTableBatchScan(
+                DataTableScan mainScan,
+                DataTableScan fallbackScan,
+                TableSchema tableSchema,
+                ChainGroupReadTable chainGroupReadTable) {
+            super(mainScan, fallbackScan);
+            this.tableSchema = tableSchema;
+            this.options = CoreOptions.fromMap(tableSchema.options());
+            this.chainGroupReadTable = chainGroupReadTable;
+            this.partitionConverter =
+                    new 
RowDataToObjectArrayConverter(tableSchema.logicalPartitionType());
+            this.partitionComputer =
+                    new InternalRowPartitionComputer(
+                            options.partitionDefaultName(),
+                            tableSchema.logicalPartitionType(),
+                            tableSchema.partitionKeys().toArray(new String[0]),
+                            options.legacyPartitionName());
+            this.partitionComparator =
+                    CodeGenUtils.newRecordComparator(
+                            
tableSchema.logicalPartitionType().getFieldTypes());
+        }
+
+        @Override
+        public Plan plan() {
+            List<Split> splits = new ArrayList<>();
+            Set<BinaryRow> completePartitions = new HashSet<>();
+            PredicateBuilder builder = new 
PredicateBuilder(tableSchema.logicalPartitionType());
+            for (Split split : mainScan.plan().splits()) {
+                DataSplit dataSplit = (DataSplit) split;
+                HashMap<String, String> fileBucketPathMapping = new 
HashMap<>();
+                HashMap<String, String> fileBranchMapping = new HashMap<>();
+                for (DataFileMeta file : dataSplit.dataFiles()) {
+                    fileBucketPathMapping.put(file.fileName(), ((DataSplit) 
split).bucketPath());
+                    fileBranchMapping.put(file.fileName(), 
options.scanFallbackSnapshotBranch());
+                }
+                splits.add(
+                        new ChainSplit(
+                                dataSplit.partition(),
+                                dataSplit.dataFiles(),
+                                fileBucketPathMapping,
+                                fileBranchMapping));
+                completePartitions.add(dataSplit.partition());
+            }
+            List<BinaryRow> remainingPartitions =
+                    fallbackScan.listPartitions().stream()
+                            .filter(p -> !completePartitions.contains(p))
+                            .collect(Collectors.toList());
+            if (!remainingPartitions.isEmpty()) {
+                fallbackScan.withPartitionFilter(remainingPartitions);
+                List<BinaryRow> deltaPartitions = 
fallbackScan.listPartitions();
+                deltaPartitions =
+                        deltaPartitions.stream()
+                                .sorted((o1, o2) -> 
partitionComparator.compare(o1, o2))
+                                .collect(Collectors.toList());
+                BinaryRow maxPartition = 
deltaPartitions.get(deltaPartitions.size() - 1);
+                Predicate snapshotPredicate =
+                        ChainTableUtils.createTriangularPredicate(
+                                maxPartition,
+                                partitionConverter,
+                                (Integer i, Object j) -> builder.equal(i, j),
+                                (Integer i, Object j) -> builder.lessThan(i, 
j));
+                mainScan.withPartitionFilter(snapshotPredicate);
+                List<BinaryRow> candidateSnapshotPartitions = 
mainScan.listPartitions();
+                candidateSnapshotPartitions =
+                        candidateSnapshotPartitions.stream()
+                                .sorted((o1, o2) -> 
partitionComparator.compare(o1, o2))
+                                .collect(Collectors.toList());
+                Map<BinaryRow, BinaryRow> partitionMapping =
+                        ChainTableUtils.findFirstLatestPartitions(
+                                deltaPartitions, candidateSnapshotPartitions, 
partitionComparator);
+                for (Map.Entry<BinaryRow, BinaryRow> partitionParis : 
partitionMapping.entrySet()) {
+                    DataTableScan snapshotScan = 
chainGroupReadTable.newSnapshotScan();
+                    DataTableScan deltaScan = 
chainGroupReadTable.newDeltaScan();
+                    if (partitionParis.getValue() == null) {
+                        List<Predicate> predicates = new ArrayList<>();
+                        predicates.add(
+                                ChainTableUtils.createTriangularPredicate(
+                                        partitionParis.getKey(),
+                                        partitionConverter,
+                                        (Integer i, Object j) -> 
builder.equal(i, j),
+                                        (Integer i, Object j) -> 
builder.lessThan(i, j)));
+                        predicates.add(
+                                ChainTableUtils.createLinearPredicate(
+                                        partitionParis.getKey(),
+                                        partitionConverter,
+                                        (Integer i, Object j) -> 
builder.equal(i, j)));
+                        
deltaScan.withPartitionFilter(PredicateBuilder.or(predicates));
+                    } else {
+                        List<BinaryRow> selectedDeltaPartitions =
+                                ChainTableUtils.getDeltaPartitions(
+                                        partitionParis.getValue(),
+                                        partitionParis.getKey(),
+                                        tableSchema.partitionKeys(),
+                                        tableSchema.logicalPartitionType(),
+                                        options,
+                                        partitionComparator,
+                                        partitionComputer);
+                        deltaScan.withPartitionFilter(selectedDeltaPartitions);
+                    }
+                    List<Split> subSplits = deltaScan.plan().splits();
+                    Set<String> snapshotFileNames = new HashSet<>();
+                    if (partitionParis.getValue() != null) {
+                        
snapshotScan.withPartitionFilter(Arrays.asList(partitionParis.getValue()));
+                        List<Split> mainSubSplits = 
snapshotScan.plan().splits();
+                        snapshotFileNames =
+                                mainSubSplits.stream()
+                                        .flatMap(
+                                                s ->
+                                                        ((DataSplit) s)
+                                                                
.dataFiles().stream()
+                                                                        .map(
+                                                                               
 DataFileMeta
+                                                                               
         ::fileName))
+                                        .collect(Collectors.toSet());
+                        subSplits.addAll(mainSubSplits);
+                    }
+                    Map<Integer, List<DataSplit>> bucketSplits = new 
LinkedHashMap<>();
+                    for (Split split : subSplits) {
+                        DataSplit dataSplit = (DataSplit) split;
+                        Preconditions.checkArgument(
+                                dataSplit.totalBuckets() == options.bucket(),
+                                "Inconsistent bucket num " + 
dataSplit.bucket());
+                        bucketSplits
+                                .computeIfAbsent(dataSplit.bucket(), k -> new 
ArrayList<>())
+                                .add(dataSplit);
+                    }
+                    for (Map.Entry<Integer, List<DataSplit>> entry : 
bucketSplits.entrySet()) {
+                        HashMap<String, String> fileBucketPathMapping = new 
HashMap<>();
+                        HashMap<String, String> fileBranchMapping = new 
HashMap<>();
+                        List<DataSplit> splitList = entry.getValue();
+                        for (DataSplit dataSplit : splitList) {
+                            for (DataFileMeta file : dataSplit.dataFiles()) {
+                                fileBucketPathMapping.put(file.fileName(), 
dataSplit.bucketPath());
+                                String branch =
+                                        
snapshotFileNames.contains(file.fileName())
+                                                ? 
options.scanFallbackSnapshotBranch()
+                                                : 
options.scanFallbackDeltaBranch();
+                                fileBranchMapping.put(file.fileName(), branch);
+                            }
+                        }
+                        ChainSplit split =
+                                new ChainSplit(
+                                        partitionParis.getKey(),
+                                        entry.getValue().stream()
+                                                .flatMap(
+                                                        datsSplit -> 
datsSplit.dataFiles().stream())
+                                                .collect(Collectors.toList()),
+                                        fileBucketPathMapping,
+                                        fileBranchMapping);
+                        splits.add(split);
+                    }
+                }
+            }
+            return new DataFilePlan(splits);
+        }
+
+        @Override
+        public List<PartitionEntry> listPartitionEntries() {
+            return super.listPartitionEntries();
+        }
+    }
+
+    @Override
+    public InnerTableRead newRead() {
+        return new Read();
+    }
+
+    private class Read implements InnerTableRead {
+
+        private final InnerTableRead mainRead;
+        private final InnerTableRead fallbackRead;
+
+        private Read() {
+            this.mainRead = wrapped.newRead();
+            this.fallbackRead = fallback().newRead();
+        }
+
+        @Override
+        public InnerTableRead withFilter(Predicate predicate) {
+            mainRead.withFilter(predicate);
+            fallbackRead.withFilter(predicate);
+            return this;
+        }
+
+        @Override
+        public InnerTableRead withReadType(RowType readType) {
+            mainRead.withReadType(readType);
+            fallbackRead.withReadType(readType);
+            return this;
+        }
+
+        @Override
+        public InnerTableRead forceKeepDelete() {
+            mainRead.forceKeepDelete();
+            fallbackRead.forceKeepDelete();
+            return this;
+        }
+
+        @Override
+        public TableRead executeFilter() {
+            mainRead.executeFilter();
+            fallbackRead.executeFilter();
+            return this;
+        }
+
+        @Override
+        public TableRead withIOManager(IOManager ioManager) {
+            mainRead.withIOManager(ioManager);
+            fallbackRead.withIOManager(ioManager);
+            return this;
+        }
+
+        @Override
+        public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            Preconditions.checkArgument(split instanceof ChainSplit);
+            return fallbackRead.createReader(split);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 11bc764a57..620638cb92 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -37,6 +37,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.ChainSplit;
 import org.apache.paimon.table.source.DataFilePlan;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DataTableScan;
@@ -82,7 +83,12 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         this.fallback = fallback;
 
         Preconditions.checkArgument(!(wrapped instanceof 
FallbackReadFileStoreTable));
-        Preconditions.checkArgument(!(fallback instanceof 
FallbackReadFileStoreTable));
+        Preconditions.checkArgument(isValidFallbackTable());
+    }
+
+    private boolean isValidFallbackTable() {
+        return fallback instanceof ChainGroupReadTable
+                || !(fallback instanceof FallbackReadFileStoreTable);
     }
 
     public FileStoreTable fallback() {
@@ -128,7 +134,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         fallback.setManifestCache(manifestCache);
     }
 
-    private FileStoreTable switchWrappedToBranch(String branchName) {
+    protected FileStoreTable switchWrappedToBranch(String branchName) {
         Optional<TableSchema> optionalSchema =
                 wrapped.schemaManager().copyWithBranch(branchName).latest();
         Preconditions.checkArgument(
@@ -146,7 +152,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
                 wrapped.catalogEnvironment());
     }
 
-    private Map<String, String> rewriteFallbackOptions(Map<String, String> 
options) {
+    protected Map<String, String> rewriteFallbackOptions(Map<String, String> 
options) {
         Map<String, String> result = new HashMap<>(options);
 
         // branch of fallback table should never change
@@ -186,7 +192,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
         return new FallbackReadScan(wrapped.newScan(), fallback.newScan());
     }
 
-    private void validateSchema() {
+    protected void validateSchema() {
         String mainBranch = wrapped.coreOptions().branch();
         String fallbackBranch = fallback.coreOptions().branch();
         RowType mainRowType = wrapped.schema().logicalRowType();
@@ -289,8 +295,8 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
     /** Scan implementation for {@link FallbackReadFileStoreTable}. */
     public static class FallbackReadScan implements DataTableScan {
 
-        private final DataTableScan mainScan;
-        private final DataTableScan fallbackScan;
+        protected final DataTableScan mainScan;
+        protected final DataTableScan fallbackScan;
 
         public FallbackReadScan(DataTableScan mainScan, DataTableScan 
fallbackScan) {
             this.mainScan = mainScan;
@@ -383,7 +389,7 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
         @Override
         public TableScan.Plan plan() {
-            List<DataSplit> splits = new ArrayList<>();
+            List<Split> splits = new ArrayList<>();
             Set<BinaryRow> completePartitions = new HashSet<>();
             for (Split split : mainScan.plan().splits()) {
                 DataSplit dataSplit = (DataSplit) split;
@@ -398,7 +404,12 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             if (!remainingPartitions.isEmpty()) {
                 fallbackScan.withPartitionFilter(remainingPartitions);
                 for (Split split : fallbackScan.plan().splits()) {
-                    splits.add(new FallbackDataSplit((DataSplit) split, true));
+                    if (split instanceof DataSplit) {
+                        splits.add(new FallbackDataSplit((DataSplit) split, 
true));
+                    } else {
+                        Preconditions.checkArgument(split instanceof 
ChainSplit);
+                        splits.add(split);
+                    }
                 }
             }
             return new DataFilePlan(splits);
@@ -472,6 +483,13 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
 
         @Override
         public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+            if (wrapped.coreOptions().isChainTable()) {
+                if (split instanceof ChainSplit) {
+                    return fallbackRead.createReader(split);
+                } else {
+                    return mainRead.createReader(split);
+                }
+            }
             if (split instanceof FallbackDataSplit) {
                 FallbackDataSplit fallbackDataSplit = (FallbackDataSplit) 
split;
                 if (fallbackDataSplit.isFallback) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 06fc9b3020..73ae6d2dd1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.ChainTableUtils;
 import org.apache.paimon.utils.StringUtils;
 
 import org.slf4j.Logger;
@@ -98,7 +99,9 @@ public class FileStoreTableFactory {
 
         Options options = new Options(table.options());
         String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
-        if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
+        if (ChainTableUtils.isChainTable(options.toMap())) {
+            table = createChainTable(table, fileIO, tablePath, dynamicOptions, 
catalogEnvironment);
+        } else if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
             Options branchOptions = new Options(dynamicOptions.toMap());
             branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
             Optional<TableSchema> schema =
@@ -130,6 +133,65 @@ public class FileStoreTableFactory {
         return table;
     }
 
+    public static FileStoreTable createChainTable(
+            FileStoreTable table,
+            FileIO fileIO,
+            Path tablePath,
+            Options dynamicOptions,
+            CatalogEnvironment catalogEnvironment) {
+        String scanFallbackSnapshotBranch =
+                
table.options().get(CoreOptions.SCAN_FALLBACK_SNAPSHOT_BRANCH.key());
+        String scanFallbackDeltaBranch =
+                
table.options().get(CoreOptions.SCAN_FALLBACK_DELTA_BRANCH.key());
+        String currentBranch = 
table.schema().options().get(CoreOptions.BRANCH.key());
+        if (scanFallbackSnapshotBranch == null || scanFallbackDeltaBranch == 
null) {
+            return table;
+        }
+
+        boolean scanSnapshotBranch = 
scanFallbackSnapshotBranch.equalsIgnoreCase(currentBranch);
+        boolean scanDeltaBranch = 
scanFallbackDeltaBranch.equalsIgnoreCase(currentBranch);
+        LOG.info(
+                "Create chain table, tbl path {}, snapshotBranch {}, 
deltaBranch{}, currentBranch {} "
+                        + "scanSnapshotBranch{} scanDeltaBranch {}.",
+                tablePath,
+                scanFallbackSnapshotBranch,
+                scanFallbackDeltaBranch,
+                currentBranch,
+                scanSnapshotBranch,
+                scanDeltaBranch);
+        if (scanSnapshotBranch || scanDeltaBranch) {
+            return table;
+        }
+
+        Options snapshotBranchOptions = new Options(dynamicOptions.toMap());
+        snapshotBranchOptions.set(CoreOptions.BRANCH, 
scanFallbackSnapshotBranch);
+        Optional<TableSchema> snapshotSchema =
+                new SchemaManager(fileIO, tablePath, 
scanFallbackSnapshotBranch).latest();
+        AbstractFileStoreTable snapshotTable =
+                (AbstractFileStoreTable)
+                        createWithoutFallbackBranch(
+                                fileIO,
+                                tablePath,
+                                snapshotSchema.get(),
+                                snapshotBranchOptions,
+                                catalogEnvironment);
+        Options deltaBranchOptions = new Options(dynamicOptions.toMap());
+        deltaBranchOptions.set(CoreOptions.BRANCH, scanFallbackDeltaBranch);
+        Optional<TableSchema> deltaSchema =
+                new SchemaManager(fileIO, tablePath, 
scanFallbackDeltaBranch).latest();
+        AbstractFileStoreTable deltaTable =
+                (AbstractFileStoreTable)
+                        createWithoutFallbackBranch(
+                                fileIO,
+                                tablePath,
+                                deltaSchema.get(),
+                                deltaBranchOptions,
+                                catalogEnvironment);
+        FileStoreTable chainGroupFileStoreTable =
+                new ChainGroupReadTable(snapshotTable, deltaTable);
+        return new FallbackReadFileStoreTable(table, chainGroupFileStoreTable);
+    }
+
     public static FileStoreTable createWithoutFallbackBranch(
             FileIO fileIO,
             Path tablePath,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 100c696f4b..dcadfad1ff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -148,6 +148,12 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
         return this;
     }
 
+    @Override
+    public InnerTableScan withPartitionFilter(Predicate predicate) {
+        snapshotReader.withPartitionFilter(predicate);
+        return this;
+    }
+
     @Override
     public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
         snapshotReader.withLevelFilter(levelFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index dccdfcef3d..bc3d0f275f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -66,6 +66,10 @@ public interface InnerTableScan extends TableScan {
         return this;
     }
 
+    default InnerTableScan withPartitionFilter(Predicate predicate) {
+        return this;
+    }
+
     default InnerTableScan withRowRanges(List<Range> rowRanges) {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
new file mode 100644
index 0000000000..5d3d84692e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.partition.PartitionTimeExtractor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.RowType;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/** Utils for chain table. */
+public class ChainTableUtils {
+
+    public static boolean isChainTable(Map<String, String> tblOptions) {
+        return CoreOptions.fromMap(tblOptions).isChainTable();
+    }
+
+    public static Map<BinaryRow, BinaryRow> findFirstLatestPartitions(
+            List<BinaryRow> sortedSourcePartitions,
+            List<BinaryRow> sortedTargetPartitions,
+            RecordComparator partitionComparator) {
+        Map<BinaryRow, BinaryRow> partitionMapping = new HashMap<>();
+        int targetIndex = 0;
+        for (BinaryRow sourceRow : sortedSourcePartitions) {
+            BinaryRow firstSmaller;
+            while (targetIndex < sortedTargetPartitions.size()
+                    && partitionComparator.compare(
+                                    sortedTargetPartitions.get(targetIndex), 
sourceRow)
+                            < 0) {
+                targetIndex++;
+            }
+            firstSmaller = (targetIndex > 0) ? 
sortedTargetPartitions.get(targetIndex - 1) : null;
+            partitionMapping.put(sourceRow, firstSmaller);
+        }
+        return partitionMapping;
+    }
+
+    public static List<BinaryRow> getDeltaPartitions(
+            BinaryRow beginPartition,
+            BinaryRow endPartition,
+            List<String> partitionColumns,
+            RowType partType,
+            CoreOptions options,
+            RecordComparator partitionComparator,
+            InternalRowPartitionComputer partitionComputer) {
+        InternalRowSerializer serializer = new InternalRowSerializer(partType);
+        List<BinaryRow> deltaPartitions = new ArrayList<>();
+        boolean isDailyPartition = partitionColumns.size() == 1;
+        List<String> startPartitionValues =
+                new 
ArrayList<>(partitionComputer.generatePartValues(beginPartition).values());
+        List<String> endPartitionValues =
+                new 
ArrayList<>(partitionComputer.generatePartValues(endPartition).values());
+        PartitionTimeExtractor timeExtractor =
+                new PartitionTimeExtractor(
+                        options.partitionTimestampPattern(), 
options.partitionTimestampFormatter());
+        LocalDateTime stratPartitionTime =
+                timeExtractor.extract(partitionColumns, startPartitionValues);
+        LocalDateTime candidateTime = stratPartitionTime;
+        LocalDateTime endPartitionTime =
+                timeExtractor.extract(partitionColumns, endPartitionValues);
+        while (candidateTime.compareTo(endPartitionTime) <= 0) {
+            if (isDailyPartition) {
+                if (candidateTime.compareTo(stratPartitionTime) > 0) {
+                    deltaPartitions.add(
+                            serializer
+                                    .toBinaryRow(
+                                            
InternalRowPartitionComputer.convertSpecToInternalRow(
+                                                    calPartValues(
+                                                            candidateTime,
+                                                            partitionColumns,
+                                                            
options.partitionTimestampPattern(),
+                                                            
options.partitionTimestampFormatter()),
+                                                    partType,
+                                                    
options.partitionDefaultName()))
+                                    .copy());
+                }
+            } else {
+                for (int hour = 0; hour <= 23; hour++) {
+                    candidateTime = 
candidateTime.toLocalDate().atStartOfDay().plusHours(hour);
+                    BinaryRow candidatePartition =
+                            serializer
+                                    .toBinaryRow(
+                                            
InternalRowPartitionComputer.convertSpecToInternalRow(
+                                                    calPartValues(
+                                                            candidateTime,
+                                                            partitionColumns,
+                                                            
options.partitionTimestampPattern(),
+                                                            
options.partitionTimestampFormatter()),
+                                                    partType,
+                                                    
options.partitionDefaultName()))
+                                    .copy();
+                    if (partitionComparator.compare(candidatePartition, 
beginPartition) > 0
+                            && partitionComparator.compare(candidatePartition, 
endPartition) <= 0) {
+                        deltaPartitions.add(candidatePartition);
+                    }
+                }
+            }
+            candidateTime = 
candidateTime.toLocalDate().plusDays(1).atStartOfDay();
+        }
+        return deltaPartitions;
+    }
+
+    public static Predicate createTriangularPredicate(
+            BinaryRow binaryRow,
+            RowDataToObjectArrayConverter converter,
+            BiFunction<Integer, Object, Predicate> innerFunc,
+            BiFunction<Integer, Object, Predicate> outerFunc) {
+        List<Predicate> fieldPredicates = new ArrayList<>();
+        Object[] partitionObjects = converter.convert(binaryRow);
+        for (int i = 0; i < converter.getArity(); i++) {
+            List<Predicate> andConditions = new ArrayList<>();
+            for (int j = 0; j < i; j++) {
+                Object o = partitionObjects[j];
+                andConditions.add(innerFunc.apply(j, o));
+            }
+            Object currentValue = partitionObjects[i];
+            andConditions.add(outerFunc.apply(i, currentValue));
+            fieldPredicates.add(PredicateBuilder.and(andConditions));
+        }
+        return PredicateBuilder.or(fieldPredicates);
+    }
+
+    public static Predicate createLinearPredicate(
+            BinaryRow binaryRow,
+            RowDataToObjectArrayConverter converter,
+            BiFunction<Integer, Object, Predicate> func) {
+        List<Predicate> fieldPredicates = new ArrayList<>();
+        Object[] partitionObjects = converter.convert(binaryRow);
+        for (int i = 0; i < converter.getArity(); i++) {
+            fieldPredicates.add(func.apply(i, partitionObjects[i]));
+        }
+        return PredicateBuilder.and(fieldPredicates);
+    }
+
+    public static LinkedHashMap<String, String> calPartValues(
+            LocalDateTime dateTime,
+            List<String> partitionKeys,
+            String timestampPattern,
+            String timestampFormatter) {
+        DateTimeFormatter formatter = 
DateTimeFormatter.ofPattern(timestampFormatter);
+        String formattedDateTime = dateTime.format(formatter);
+        Pattern keyPattern = Pattern.compile("\\$(\\w+)");
+        Matcher keyMatcher = keyPattern.matcher(timestampPattern);
+        List<String> keyOrder = new ArrayList<>();
+        StringBuilder regexBuilder = new StringBuilder();
+        int lastPosition = 0;
+        while (keyMatcher.find()) {
+            regexBuilder.append(
+                    Pattern.quote(timestampPattern.substring(lastPosition, 
keyMatcher.start())));
+            regexBuilder.append("(.+)");
+            keyOrder.add(keyMatcher.group(1));
+            lastPosition = keyMatcher.end();
+        }
+        
regexBuilder.append(Pattern.quote(timestampPattern.substring(lastPosition)));
+
+        Matcher valueMatcher = 
Pattern.compile(regexBuilder.toString()).matcher(formattedDateTime);
+        if (!valueMatcher.matches() || valueMatcher.groupCount() != 
keyOrder.size()) {
+            throw new IllegalArgumentException(
+                    "Formatted datetime does not match timestamp pattern");
+        }
+
+        Map<String, String> keyValues = new HashMap<>();
+        for (int i = 0; i < keyOrder.size(); i++) {
+            keyValues.put(keyOrder.get(i), valueMatcher.group(i + 1));
+        }
+        List<String> values =
+                partitionKeys.stream()
+                        .map(key -> keyValues.getOrDefault(key, ""))
+                        .collect(Collectors.toList());
+        LinkedHashMap<String, String> res = new LinkedHashMap<>();
+        for (int i = 0; i < partitionKeys.size(); i++) {
+            res.put(partitionKeys.get(i), values.get(i));
+        }
+        return res;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
new file mode 100644
index 0000000000..99ba22c451
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test class for {@link org.apache.paimon.utils.ChainTableUtils}. */
+public class ChainTableUtilsTest {
+
+    public static final RecordComparator KEY_COMPARATOR =
+            (a, b) -> a.getString(0).compareTo(b.getString(0));
+
+    private static String partString(BinaryRow partition, 
InternalRow.FieldGetter[] getters) {
+        StringBuilder builder = new StringBuilder();
+        for (InternalRow.FieldGetter getter : getters) {
+            builder.append(getter.getFieldOrNull(partition));
+        }
+        return builder.toString();
+    }
+
+    @Test
+    public void testFindFirstLatestPartitions() {
+        String[] snapshotPartitions = {"20251105", "20251101"};
+        String[] deltaPartitions = {"20251102", "20251106"};
+
+        RowType rowType = RowType.builder().field("dt", 
DataTypes.STRING().notNull()).build();
+
+        BinaryRow snapshotPartition1 = new BinaryRow(1);
+        BinaryRowWriter writer = new BinaryRowWriter(snapshotPartition1);
+        writer.writeString(0, BinaryString.fromString(snapshotPartitions[0]));
+        InternalRow.FieldGetter[] fieldGetters =
+                InternalRowUtils.createFieldGetters(rowType.getFieldTypes());
+
+        BinaryRow snapshotPartition2 = new BinaryRow(1);
+        writer = new BinaryRowWriter(snapshotPartition2);
+        writer.writeString(0, BinaryString.fromString(snapshotPartitions[1]));
+        List<BinaryRow> sourcePartitions =
+                Stream.of(snapshotPartition1, snapshotPartition2)
+                        .sorted(KEY_COMPARATOR)
+                        .collect(Collectors.toList());
+
+        BinaryRow deltaPartition1 = new BinaryRow(1);
+        writer = new BinaryRowWriter(deltaPartition1);
+        writer.writeString(0, BinaryString.fromString(deltaPartitions[0]));
+
+        BinaryRow deltaPartition2 = new BinaryRow(1);
+        writer = new BinaryRowWriter(deltaPartition2);
+        writer.writeString(0, BinaryString.fromString(deltaPartitions[1]));
+        List<BinaryRow> targetPartitions =
+                Stream.of(deltaPartition1, deltaPartition2)
+                        .sorted(KEY_COMPARATOR)
+                        .collect(Collectors.toList());
+
+        Map<BinaryRow, BinaryRow> firstLatestPartitions =
+                ChainTableUtils.findFirstLatestPartitions(
+                        targetPartitions, sourcePartitions, KEY_COMPARATOR);
+
+        BinaryRow key1 = firstLatestPartitions.get(deltaPartition1);
+        Assertions.assertEquals(partString(key1, fieldGetters), 
snapshotPartitions[1]);
+
+        BinaryRow key2 = firstLatestPartitions.get(deltaPartition2);
+        Assertions.assertEquals(partString(key2, fieldGetters), 
snapshotPartitions[0]);
+    }
+
+    @Test
+    public void testCreateTriangularPredicate() {
+        RowType rowType =
+                RowType.builder()
+                        .field("dt", DataTypes.STRING().notNull())
+                        .field("hour", DataTypes.STRING().notNull())
+                        .build();
+        BinaryRow partitionValue = new BinaryRow(2);
+        BinaryRowWriter writer = new BinaryRowWriter(partitionValue);
+        writer.writeString(0, BinaryString.fromString("20250810"));
+        writer.writeString(1, BinaryString.fromString("23"));
+        RowDataToObjectArrayConverter partitionConverter =
+                new RowDataToObjectArrayConverter(rowType);
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                ChainTableUtils.createTriangularPredicate(
+                        partitionValue,
+                        partitionConverter,
+                        (Integer i, Object j) -> builder.equal(i, j),
+                        (Integer i, Object j) -> builder.lessThan(i, j));
+        Assertions.assertTrue(!predicate.test(partitionValue));
+        List<Predicate> predicates = new ArrayList<>();
+        predicates.add(builder.lessThan(0, partitionValue.getString(0)));
+        List<Predicate> subPredicates = new ArrayList<>();
+        subPredicates.add(builder.equal(0, partitionValue.getString(0)));
+        subPredicates.add(builder.lessThan(1, partitionValue.getString(1)));
+        predicates.add(PredicateBuilder.and(subPredicates));
+        Predicate expected = PredicateBuilder.or(predicates);
+        Assertions.assertTrue(predicate.equals(expected));
+    }
+
+    @Test
+    public void testCreateLinearPredicate() {
+        RowType rowType =
+                RowType.builder()
+                        .field("dt", DataTypes.STRING().notNull())
+                        .field("hour", DataTypes.STRING().notNull())
+                        .build();
+        BinaryRow partitionValue = new BinaryRow(2);
+        BinaryRowWriter writer = new BinaryRowWriter(partitionValue);
+        writer.writeString(0, BinaryString.fromString("20250810"));
+        writer.writeString(1, BinaryString.fromString("23"));
+        RowDataToObjectArrayConverter partitionConverter =
+                new RowDataToObjectArrayConverter(rowType);
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                ChainTableUtils.createLinearPredicate(
+                        partitionValue,
+                        partitionConverter,
+                        (Integer i, Object j) -> builder.equal(i, j));
+        Assertions.assertTrue(predicate.test(partitionValue));
+        List<Predicate> predicates = new ArrayList<>();
+        predicates.add(builder.equal(0, partitionValue.getString(0)));
+        predicates.add(builder.equal(1, partitionValue.getString(1)));
+        Predicate expected = PredicateBuilder.and(predicates);
+        Assertions.assertTrue(predicate.equals(expected));
+    }
+
+    @Test
+    public void testGeneratePartitionValues() {
+        LinkedHashMap<String, String> partitionValues =
+                ChainTableUtils.calPartValues(
+                        LocalDateTime.of(2023, 1, 1, 12, 0, 0),
+                        Arrays.asList("dt", "hour"),
+                        "$dt $hour:00:00",
+                        "yyyyMMdd HH:mm:ss");
+        assertEquals(
+                new LinkedHashMap<String, String>() {
+                    {
+                        put("dt", "20230101");
+                        put("hour", "12");
+                    }
+                },
+                partitionValues);
+
+        partitionValues =
+                ChainTableUtils.calPartValues(
+                        LocalDateTime.of(2023, 1, 1, 0, 0, 0),
+                        Arrays.asList("dt"),
+                        "$dt",
+                        "yyyyMMdd");
+        assertEquals(
+                new LinkedHashMap<String, String>() {
+                    {
+                        put("dt", "20230101");
+                    }
+                },
+                partitionValues);
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 583522822b..aaaaec1854 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -174,4 +174,428 @@ public class SparkCatalogWithHiveTest {
                         
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
                 .master("local[2]");
     }
+
+    @Test
+    public void testChainTable(@TempDir java.nio.file.Path tempDir) throws 
IOException {
+        Path warehousePath = new Path("file:" + tempDir.toString());
+        SparkSession.Builder builder =
+                SparkSession.builder()
+                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
+                        // with hive metastore
+                        .config("spark.sql.catalogImplementation", "hive")
+                        .config("hive.metastore.uris", "thrift://localhost:" + 
PORT)
+                        .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
+                        .config("spark.sql.catalog.spark_catalog.metastore", 
"hive")
+                        .config(
+                                
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
+                                "thrift://localhost:" + PORT)
+                        
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
+                        .config(
+                                "spark.sql.catalog.spark_catalog.warehouse",
+                                warehousePath.toString())
+                        .config(
+                                "spark.sql.extensions",
+                                
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+                        .master("local[2]");
+        SparkSession spark = builder.getOrCreate();
+        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+        spark.sql("USE spark_catalog.my_db1");
+
+        /** Create table */
+        spark.sql(
+                "CREATE TABLE IF NOT EXISTS \n"
+                        + "  `my_db1`.`chain_test` (\n"
+                        + "    `t1` BIGINT COMMENT 't1',\n"
+                        + "    `t2` BIGINT COMMENT 't2',\n"
+                        + "    `t3` STRING COMMENT 't3'\n"
+                        + "  ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW 
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
+                        + "WITH\n"
+                        + "  SERDEPROPERTIES ('serialization.format' = '1') 
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' 
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES 
(\n"
+                        + "    'bucket-key' = 't1',\n"
+                        + "    'primary-key' = 'dt,t1',\n"
+                        + "    'partition.timestamp-pattern' = '$dt',\n"
+                        + "    'partition.timestamp-formatter' = 'yyyyMMdd',\n"
+                        + "    'chain-table.enabled' = 'true',\n"
+                        + "    'bucket' = '2',\n"
+                        + "    'merge-engine' = 'deduplicate', \n"
+                        + "    'sequence.field' = 't2'\n"
+                        + "  )");
+
+        /** Create branch */
+        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
+        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
+
+        /** Set branch */
+        spark.sql(
+                "ALTER TABLE my_db1.chain_test SET tblproperties ("
+                        + "'scan.fallback-snapshot-branch' = 'snapshot', "
+                        + "'scan.fallback-delta-branch' = 'delta')");
+        spark.sql(
+                "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET 
tblproperties ("
+                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
+                        + "'scan.fallback-delta-branch' = 'delta')");
+        spark.sql(
+                "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET 
tblproperties ("
+                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
+                        + "'scan.fallback-delta-branch' = 'delta')");
+        spark.close();
+        spark = builder.getOrCreate();
+
+        /** Write main branch */
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810') values (1, 1, '1'),(2, 1, '1');");
+
+        /** Write delta branch */
+        spark.sql("set spark.paimon.branch=delta;");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250809') values (1, 1, '1'),(2, 1, '1');");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810') values (1, 2, '1-1' ),(3, 1, '1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811') values (2, 2, '1-1' ),(4, 1, '1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250812') values (3, 2, '1-1' ),(4, 2, '1-1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250813') values (5, 1, '1' ),(6, 1, '1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250814') values (5, 2, '1-1' ),(6, 2, '1-1' );");
+
+        /** Write snapshot branch */
+        spark.sql("set spark.paimon.branch=snapshot;");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test`  partition (dt 
= '20250810')  values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250812') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250814') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), 
(5, 1, '1' ), (6, 1, '1');");
+
+        spark.close();
+        spark = builder.getOrCreate();
+        /** Main read */
+        assertThat(
+                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250810'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[1,1,1,20250810]", 
"[2,1,1,20250810]");
+
+        /** Snapshot read */
+        assertThat(
+                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250814'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,2,1-1,20250814]",
+                        "[2,2,1-1,20250814]",
+                        "[3,2,1-1,20250814]",
+                        "[4,2,1-1,20250814]",
+                        "[5,1,1,20250814]",
+                        "[6,1,1,20250814]");
+
+        /** Chain read */
+        /** 1. non pre snapshot */
+        assertThat(
+                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250809'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[1,1,1,20250809]", 
"[2,1,1,20250809]");
+        /** 2. has pre snapshot */
+        assertThat(
+                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250811'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,2,1-1,20250811]",
+                        "[2,2,1-1,20250811]",
+                        "[3,1,1,20250811]",
+                        "[4,1,1,20250811]");
+
+        /** Multi partition Read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt in ('20250810', '20250811', '20250812');")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,1,1,20250810]",
+                        "[2,1,1,20250810]",
+                        "[1,2,1-1,20250811]",
+                        "[2,2,1-1,20250811]",
+                        "[3,1,1,20250811]",
+                        "[4,1,1,20250811]",
+                        "[1,2,1-1,20250812]",
+                        "[2,2,1-1,20250812]",
+                        "[3,2,1-1,20250812]",
+                        "[4,2,1-1,20250812]");
+
+        /** Incremental read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250811'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[2,2,1-1,20250811]", 
"[4,1,1,20250811]");
+
+        /** Multi partition incremental read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test$branch_delta` where dt in ('20250810', '20250811', 
'20250812');")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,2,1-1,20250810]",
+                        "[3,1,1,20250810]",
+                        "[2,2,1-1,20250811]",
+                        "[4,1,1,20250811]",
+                        "[3,2,1-1,20250812]",
+                        "[4,2,1-1,20250812]");
+
+        /** Hybrid read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "select * from  `my_db1`.`chain_test` 
where dt = '20250811'\n"
+                                                + "union all\n"
+                                                + "select * from  
`my_db1`.`chain_test$branch_delta`  where dt = '20250811'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,2,1-1,20250811]",
+                        "[2,2,1-1,20250811]",
+                        "[3,1,1,20250811]",
+                        "[4,1,1,20250811]",
+                        "[2,2,1-1,20250811]",
+                        "[4,1,1,20250811]");
+
+        spark.close();
+        spark = builder.getOrCreate();
+
+        /** Drop table */
+        spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+
+        spark.close();
+    }
+
+    @Test
+    public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) 
throws IOException {
+        Path warehousePath = new Path("file:" + tempDir.toString());
+        SparkSession.Builder builder =
+                SparkSession.builder()
+                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
+                        // with hive metastore
+                        .config("spark.sql.catalogImplementation", "hive")
+                        .config("hive.metastore.uris", "thrift://localhost:" + 
PORT)
+                        .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
+                        .config("spark.sql.catalog.spark_catalog.metastore", 
"hive")
+                        .config(
+                                
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
+                                "thrift://localhost:" + PORT)
+                        
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
+                        .config(
+                                "spark.sql.catalog.spark_catalog.warehouse",
+                                warehousePath.toString())
+                        .config(
+                                "spark.sql.extensions",
+                                
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+                        .master("local[2]");
+        SparkSession spark = builder.getOrCreate();
+        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+        spark.sql("USE spark_catalog.my_db1");
+
+        /** Create table */
+        spark.sql(
+                "CREATE TABLE IF NOT EXISTS \n"
+                        + "  `my_db1`.`chain_test` (\n"
+                        + "    `t1` BIGINT COMMENT 't1',\n"
+                        + "    `t2` BIGINT COMMENT 't2',\n"
+                        + "    `t3` STRING COMMENT 't3'\n"
+                        + "  ) PARTITIONED BY (`dt` STRING COMMENT 'dt', 
`hour` STRING COMMENT 'hour') ROW FORMAT SERDE 
'org.apache.paimon.hive.PaimonSerDe'\n"
+                        + "WITH\n"
+                        + "  SERDEPROPERTIES ('serialization.format' = '1') 
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' 
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES 
(\n"
+                        + "    'bucket-key' = 't1',\n"
+                        + "    'primary-key' = 'dt,hour,t1',\n"
+                        + "    'partition.timestamp-pattern' = '$dt 
$hour:00:00',\n"
+                        + "    'partition.timestamp-formatter' = 'yyyyMMdd 
HH:mm:ss',\n"
+                        + "    'chain-table.enabled' = 'true',\n"
+                        + "    'bucket' = '2',\n"
+                        + "    'merge-engine' = 'deduplicate', \n"
+                        + "    'sequence.field' = 't2'\n"
+                        + "  )");
+
+        /** Create branch */
+        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
+        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
+
+        /** Set branch */
+        spark.sql(
+                "ALTER TABLE my_db1.chain_test SET tblproperties ("
+                        + "'scan.fallback-snapshot-branch' = 'snapshot', "
+                        + "'scan.fallback-delta-branch' = 'delta')");
+        spark.sql(
+                "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET 
tblproperties ("
+                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
+                        + "'scan.fallback-delta-branch' = 'delta')");
+        spark.sql(
+                "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET 
tblproperties ("
+                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
+                        + "'scan.fallback-delta-branch' = 'delta')");
+        spark.close();
+        spark = builder.getOrCreate();
+
+        /** Write main branch */
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810', hour = '22') values (1, 1, '1'),(2, 1, '1');");
+
+        /** Write delta branch */
+        spark.sql("set spark.paimon.branch=delta;");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810', hour = '21') values (1, 1, '1'),(2, 1, '1');");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810', hour = '22') values (1, 2, '1-1' ),(3, 1, '1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810', hour = '23') values (2, 2, '1-1' ),(4, 1, '1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '00') values (3, 2, '1-1' ),(4, 2, '1-1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '01') values (5, 1, '1' ),(6, 1, '1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '02') values (5, 2, '1-1' ),(6, 2, '1-1' );");
+
+        /** Write snapshot branch */
+        spark.sql("set spark.paimon.branch=snapshot;");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test`  partition (dt 
= '20250810', hour = '22')  values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 
2, '1-1');");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 
2, '1-1'), (5, 1, '1' ), (6, 1, '1');");
+
+        spark.close();
+        spark = builder.getOrCreate();
+        /** Main read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250810' and hour = '22'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[1,1,1,20250810,22]", 
"[2,1,1,20250810,22]");
+
+        /** Snapshot read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250811' and hour = '02'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,2,1-1,20250811,02]",
+                        "[2,2,1-1,20250811,02]",
+                        "[3,2,1-1,20250811,02]",
+                        "[4,2,1-1,20250811,02]",
+                        "[5,1,1,20250811,02]",
+                        "[6,1,1,20250811,02]");
+
+        /** Chain read */
+        /** 1. non pre snapshot */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250810' and hour = '21'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[1,1,1,20250810,21]", 
"[2,1,1,20250810,21]");
+        /** 2. has pre snapshot */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250810' and  hour = '23'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,2,1-1,20250810,23]",
+                        "[2,2,1-1,20250810,23]",
+                        "[3,1,1,20250810,23]",
+                        "[4,1,1,20250810,23]");
+
+        /** Multi partition Read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250810' and hour in ('22', '23');")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,1,1,20250810,22]",
+                        "[2,1,1,20250810,22]",
+                        "[1,2,1-1,20250810,23]",
+                        "[2,2,1-1,20250810,23]",
+                        "[3,1,1,20250810,23]",
+                        "[4,1,1,20250810,23]");
+
+        /** Incremental read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("[2,2,1-1,20250810,23]", 
"[4,1,1,20250810,23]");
+
+        /** Multi partition incremental read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "SELECT * FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour in ('22', 
'23');")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,2,1-1,20250810,22]",
+                        "[3,1,1,20250810,22]",
+                        "[2,2,1-1,20250810,23]",
+                        "[4,1,1,20250810,23]");
+
+        /** Hybrid read */
+        assertThat(
+                        spark
+                                .sql(
+                                        "select * from  `my_db1`.`chain_test` 
where dt = '20250810' and hour = '23'\n"
+                                                + "union all\n"
+                                                + "select * from  
`my_db1`.`chain_test$branch_delta`  where dt = '20250810' and hour = '23'")
+                                .collectAsList().stream()
+                                .map(Row::toString)
+                                .collect(Collectors.toList()))
+                .containsExactlyInAnyOrder(
+                        "[1,2,1-1,20250810,23]",
+                        "[2,2,1-1,20250810,23]",
+                        "[3,1,1,20250810,23]",
+                        "[4,1,1,20250810,23]",
+                        "[2,2,1-1,20250810,23]",
+                        "[4,1,1,20250810,23]");
+
+        spark.close();
+        spark = builder.getOrCreate();
+
+        /** Drop table */
+        spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+
+        spark.close();
+    }
 }


Reply via email to