This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a9df02e272 [core] Support chain tbl on batch mode (#6394)
a9df02e272 is described below
commit a9df02e2720e9b21fa83b4d262461e4711392b17
Author: Stefanietry <[email protected]>
AuthorDate: Wed Dec 24 22:17:55 2025 +0800
[core] Support chain tbl on batch mode (#6394)
---
docs/content/primary-key-table/chain-table.md | 148 +++++++
.../shortcodes/generated/core_configuration.html | 18 +
docs/static/img/chain-table.png | Bin 0 -> 205667 bytes
.../main/java/org/apache/paimon/CoreOptions.java | 34 ++
.../paimon/io/ChainKeyValueFileReaderFactory.java | 13 +-
.../org/apache/paimon/schema/SchemaValidation.java | 34 ++
.../apache/paimon/table/ChainGroupReadTable.java | 356 +++++++++++++++++
.../paimon/table/FallbackReadFileStoreTable.java | 34 +-
.../apache/paimon/table/FileStoreTableFactory.java | 64 +++-
.../paimon/table/source/AbstractDataTableScan.java | 6 +
.../apache/paimon/table/source/InnerTableScan.java | 4 +
.../org/apache/paimon/utils/ChainTableUtils.java | 207 ++++++++++
.../apache/paimon/utils/ChainTableUtilsTest.java | 192 ++++++++++
.../paimon/spark/SparkCatalogWithHiveTest.java | 424 +++++++++++++++++++++
14 files changed, 1524 insertions(+), 10 deletions(-)
diff --git a/docs/content/primary-key-table/chain-table.md
b/docs/content/primary-key-table/chain-table.md
new file mode 100644
index 0000000000..84c1f298a3
--- /dev/null
+++ b/docs/content/primary-key-table/chain-table.md
@@ -0,0 +1,148 @@
+---
+title: "Chain Table"
+weight: 6
+type: docs
+aliases:
+- /primary-key-table/chain-table.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Chain Table
+
+Chain table is a new capability for primary key tables that transforms how you
process incremental data.
+Imagine a scenario where you periodically store a full snapshot of data (for
example, once a day), even
+though only a small portion changes between snapshots. ODS binlog dump is a
typical example of this pattern.
+
+Taking a daily binlog dump job as an example. A batch job merges yesterday’s
full dataset with today’s
+incremental changes to produce a new full dataset. This approach has two clear
drawbacks:
+* Full computation: Merge operation includes all data, and it will involve
shuffle, which results in poor performance.
+* Full storage: Store a full set of data every day, and the changed data
usually accounts for a very small proportion.
+
+Paimon addresses this problem by directly consuming only the changed data and
performing merge-on-read.
+In this way, full computation and storage are turned into incremental mode:
+* Incremental computation: The offline ETL daily job only needs to consume the
changed data of the current day and do not require merging all data.
+* Incremental Storage: Only store the changed data each day, and
asynchronously compact it periodically (e.g., weekly) to build a global chain
table within the lifecycle.
+ {{< img src="/img/chain-table.png">}}
+
+Based on the regular table, chain table introduces snapshot and delta branches
to represent full and incremental
+data respectively. When writing, you specify the branch to write full or
incremental data. When reading, paimon
+automatically chooses the appropriate strategy based on the read mode, such as
full, incremental, or hybrid.
+
+To enable chain table, you must config `chain-table.enabled` to true in the
table options when creating the
+table, and the snapshot and delta branch need to be created as well. Consider
an example via Spark SQL:
+
+```sql
+CREATE TABLE default.t (
+ `t1` string ,
+ `t2` string ,
+ `t3` string
+) PARTITIONED BY (`date` string)
+TBLPROPERTIES (
+ 'chain-table.enabled' = 'true',
+ -- props about primary key table
+ 'primary-key' = 'date,t1',
+ 'sequence.field' = 't2',
+ 'bucket-key' = 't1',
+ 'bucket' = '2',
+ -- props about partition
+ 'partition.timestamp-pattern' = '$date',
+ 'partition.timestamp-formatter' = 'yyyyMMdd'
+);
+
+CALL sys.create_branch('default.t', 'snapshot');
+
+CALL sys.create_branch('default.t', 'delta');
+
+ALTER TABLE default.t SET tblproperties
+ ('scan.fallback-snapshot-branch' = 'snapshot',
+ 'scan.fallback-delta-branch' = 'delta');
+
+ALTER TABLE `default`.`t$branch_snapshot` SET tblproperties
+ ('scan.fallback-snapshot-branch' = 'snapshot',
+ 'scan.fallback-delta-branch' = 'delta');
+
+ALTER TABLE `default`.`t$branch_delta` SET tblproperties
+ ('scan.fallback-snapshot-branch' = 'snapshot',
+ 'scan.fallback-delta-branch' = 'delta');
+```
+
+Notice that:
+- Chain table is only supported for primary key table, which means you should
define `bucket` and `bucket-key` for the table.
+- Chain table should ensure that the schema of each branch is consistent.
+- Only spark support now, flink will be supported later.
+- Chain compact is not supported for now, and it will be supported later.
+
+After creating a chain table, you can read and write data in the following
ways.
+
+- Full Write: Write data to t$branch_snapshot.
+```sql
+insert overwrite `default`.`t$branch_snapshot` partition (date = '20250810')
+ values ('1', '1', '1');
+```
+
+- Incremental Write: Write data to t$branch_delta.
+```sql
+insert overwrite `default`.`t$branch_delta` partition (date = '20250811')
+ values ('2', '1', '1');
+```
+
+- Full Query: If the snapshot branch has full partition, read it directly;
otherwise, read on chain merge mode.
+```sql
+select t1, t2, t3 from default.t where date = '20250811'
+```
+you will get the following result:
+```text
++---+----+-----+
+| t1| t2| t3|
++---+----+-----+
+| 1 | 1| 1 |
+| 2 | 1| 1 |
++---+----+-----+
+```
+
+- Incremental Query: Read the incremental partition from t$branch_delta
+```sql
+select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
+```
+you will get the following result:
+```text
++---+----+-----+
+| t1| t2| t3|
++---+----+-----+
+| 2 | 1| 1 |
++---+----+-----+
+```
+
+- Hybrid Query: Read both full and incremental data simultaneously.
+```sql
+select t1, t2, t3 from default.t where date = '20250811'
+union all
+select t1, t2, t3 from `default`.`t$branch_delta` where date = '20250811'
+```
+you will get the following result:
+```text
++---+----+-----+
+| t1| t2| t3|
++---+----+-----+
+| 1 | 1| 1 |
+| 2 | 1| 1 |
+| 2 | 1| 1 |
++---+----+-----+
+```
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 269625f718..027d798c06 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -98,6 +98,12 @@ under the License.
<td>MemorySize</td>
<td>Memory page size for caching.</td>
</tr>
+ <tr>
+ <td><h5>chain-table.enabled</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether enabled chain table.</td>
+ </tr>
<tr>
<td><h5>changelog-file.compression</h5></td>
<td style="word-wrap: break-word;">(none)</td>
@@ -1061,6 +1067,18 @@ This config option does not affect the default
filesystem metastore.</td>
<td>String</td>
<td>When a batch job queries from a table, if a partition does not
exist in the current branch, the reader will try to get this partition from
this fallback branch.</td>
</tr>
+ <tr>
+ <td><h5>scan.fallback-delta-branch</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>When a batch job queries from a chain table, if a partition
does not exist in either main or snapshot branch, the reader will try to get
this partition from chain snapshot and delta branch together.</td>
+ </tr>
+ <tr>
+ <td><h5>scan.fallback-snapshot-branch</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>When a batch job queries from a chain table, if a partition
does not exist in the main branch, the reader will try to get this partition
from chain snapshot branch.</td>
+ </tr>
<tr>
<td><h5>scan.file-creation-time-millis</h5></td>
<td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/static/img/chain-table.png b/docs/static/img/chain-table.png
new file mode 100644
index 0000000000..cfd79ea095
Binary files /dev/null and b/docs/static/img/chain-table.png differ
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ae48355836..d5bd5789e8 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -226,6 +226,28 @@ public class CoreOptions implements Serializable {
public static final ConfigOption<String> BRANCH =
key("branch").stringType().defaultValue("main").withDescription("Specify branch
name.");
+ public static final ConfigOption<Boolean> CHAIN_TABLE_ENABLED =
+ key("chain-table.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether enabled chain table.");
+
+ public static final ConfigOption<String> SCAN_FALLBACK_SNAPSHOT_BRANCH =
+ key("scan.fallback-snapshot-branch")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When a batch job queries from a chain table, if a
partition does not exist in the main branch, "
+ + "the reader will try to get this
partition from chain snapshot branch.");
+
+ public static final ConfigOption<String> SCAN_FALLBACK_DELTA_BRANCH =
+ key("scan.fallback-delta-branch")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When a batch job queries from a chain table, if a
partition does not exist in either main or snapshot branch, "
+ + "the reader will try to get this
partition from chain snapshot and delta branch together.");
+
public static final String FILE_FORMAT_ORC = "orc";
public static final String FILE_FORMAT_AVRO = "avro";
public static final String FILE_FORMAT_PARQUET = "parquet";
@@ -3254,6 +3276,18 @@ public class CoreOptions implements Serializable {
return options.get(LOOKUP_MERGE_RECORDS_THRESHOLD);
}
+ public boolean isChainTable() {
+ return options.get(CHAIN_TABLE_ENABLED);
+ }
+
+ public String scanFallbackSnapshotBranch() {
+ return options.get(SCAN_FALLBACK_SNAPSHOT_BRANCH);
+ }
+
+ public String scanFallbackDeltaBranch() {
+ return options.get(SCAN_FALLBACK_DELTA_BRANCH);
+ }
+
public boolean formatTableImplementationIsPaimon() {
return options.get(FORMAT_TABLE_IMPLEMENTATION) ==
FormatTableImplementation.PAIMON;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
index 0fa9ae73ef..2c4d419989 100644
---
a/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/ChainKeyValueFileReaderFactory.java
@@ -70,8 +70,19 @@ public class ChainKeyValueFileReaderFactory extends
KeyValueFileReaderFactory {
this.chainReadContext = chainReadContext;
CoreOptions options = new CoreOptions(schema.options());
this.currentBranch = options.branch();
+ String snapshotBranch = options.scanFallbackSnapshotBranch();
+ String deltaBranch = options.scanFallbackDeltaBranch();
+ SchemaManager snapshotSchemaManager =
+ snapshotBranch.equalsIgnoreCase(currentBranch)
+ ? schemaManager
+ : schemaManager.copyWithBranch(snapshotBranch);
+ SchemaManager deltaSchemaManager =
+ deltaBranch.equalsIgnoreCase(currentBranch)
+ ? schemaManager
+ : schemaManager.copyWithBranch(deltaBranch);
this.branchSchemaManagers = new HashMap<>();
- this.branchSchemaManagers.put(currentBranch, schemaManager);
+ this.branchSchemaManagers.put(snapshotBranch, snapshotSchemaManager);
+ this.branchSchemaManagers.put(deltaBranch, deltaSchemaManager);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 56176f2b83..abbc2e4912 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -21,6 +21,7 @@ package org.apache.paimon.schema;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
+import org.apache.paimon.TableType;
import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
@@ -39,6 +40,7 @@ import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import java.util.ArrayList;
@@ -241,6 +243,8 @@ public class SchemaValidation {
validateRowTracking(schema, options);
validateIncrementalClustering(schema, options);
+
+ validateChainTable(schema, options);
}
public static void validateFallbackBranch(SchemaManager schemaManager,
TableSchema schema) {
@@ -679,4 +683,34 @@ public class SchemaValidation {
PRIMARY_KEY.key());
}
}
+
+ public static void validateChainTable(TableSchema schema, CoreOptions
options) {
+ if (options.isChainTable()) {
+ boolean isPrimaryTbl = schema.primaryKeys() != null &&
!schema.primaryKeys().isEmpty();
+ boolean isPartitionTbl =
+ schema.partitionKeys() != null &&
!schema.partitionKeys().isEmpty();
+ ChangelogProducer changelogProducer = options.changelogProducer();
+ Preconditions.checkArgument(
+ options.type() == TableType.TABLE, "Chain table must be
table type.");
+ Preconditions.checkArgument(isPrimaryTbl, "Primary key is required
for chain table.");
+ Preconditions.checkArgument(isPartitionTbl, "Chain table must be
partition table.");
+ Preconditions.checkArgument(
+ options.bucket() > 0, "Bucket number must be greater than
0 for chain table.");
+ Preconditions.checkArgument(
+ options.sequenceField() != null, "Sequence field is
required for chain table.");
+ Preconditions.checkArgument(
+ options.mergeEngine() == MergeEngine.DEDUPLICATE,
+ "Merge engine must be deduplicate for chain table.");
+ Preconditions.checkArgument(
+ changelogProducer == ChangelogProducer.NONE
+ || changelogProducer == ChangelogProducer.INPUT,
+ "Changelog producer must be none or input for chain
table.");
+ Preconditions.checkArgument(
+ options.partitionTimestampPattern() != null,
+ "Partition timestamp pattern is required for chain
table.");
+ Preconditions.checkArgument(
+ options.partitionTimestampFormatter() != null,
+ "Partition timestamp formatter is required for chain
table.");
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
new file mode 100644
index 0000000000..aa583f7e6a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ChainGroupReadTable.java
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.ChainSplit;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.DataTableScan;
+import org.apache.paimon.table.source.InnerTableRead;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Chain table which mainly read from the snapshot branch. However, if the
snapshot branch does not
+ * have a partition, it will fall back to chain read.
+ */
+public class ChainGroupReadTable extends FallbackReadFileStoreTable {
+
+ public ChainGroupReadTable(
+ AbstractFileStoreTable snapshotStoreTable, AbstractFileStoreTable
deltaStoreTable) {
+ super(snapshotStoreTable, deltaStoreTable);
+ Preconditions.checkArgument(snapshotStoreTable instanceof
PrimaryKeyFileStoreTable);
+ Preconditions.checkArgument(deltaStoreTable instanceof
PrimaryKeyFileStoreTable);
+ }
+
+ @Override
+ public DataTableScan newScan() {
+ super.validateSchema();
+ return new ChainTableBatchScan(
+ wrapped.newScan(),
+ fallback().newScan(),
+ ((AbstractFileStoreTable) wrapped).tableSchema,
+ this);
+ }
+
+ private DataTableScan newSnapshotScan() {
+ return wrapped.newScan();
+ }
+
+ private DataTableScan newDeltaScan() {
+ return fallback().newScan();
+ }
+
+ @Override
+ public FileStoreTable copy(Map<String, String> dynamicOptions) {
+ return new ChainGroupReadTable(
+ (AbstractFileStoreTable) wrapped.copy(dynamicOptions),
+ (AbstractFileStoreTable)
fallback().copy(rewriteFallbackOptions(dynamicOptions)));
+ }
+
+ @Override
+ public FileStoreTable copy(TableSchema newTableSchema) {
+ return new ChainGroupReadTable(
+ (AbstractFileStoreTable) wrapped.copy(newTableSchema),
+ (AbstractFileStoreTable)
+ fallback()
+ .copy(
+ newTableSchema.copy(
+
rewriteFallbackOptions(newTableSchema.options()))));
+ }
+
+ @Override
+ public FileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
+ return new ChainGroupReadTable(
+ (AbstractFileStoreTable)
wrapped.copyWithoutTimeTravel(dynamicOptions),
+ (AbstractFileStoreTable)
+
fallback().copyWithoutTimeTravel(rewriteFallbackOptions(dynamicOptions)));
+ }
+
+ @Override
+ public FileStoreTable copyWithLatestSchema() {
+ return new ChainGroupReadTable(
+ (AbstractFileStoreTable) wrapped.copyWithLatestSchema(),
+ (AbstractFileStoreTable) fallback().copyWithLatestSchema());
+ }
+
+ @Override
+ public FileStoreTable switchToBranch(String branchName) {
+ return new ChainGroupReadTable(
+ (AbstractFileStoreTable) switchWrappedToBranch(branchName),
+ (AbstractFileStoreTable) fallback());
+ }
+
+ /** Scan implementation for {@link ChainGroupReadTable}. */
+ public static class ChainTableBatchScan extends FallbackReadScan {
+
+ private final RowDataToObjectArrayConverter partitionConverter;
+ private final InternalRowPartitionComputer partitionComputer;
+ private final TableSchema tableSchema;
+ private final CoreOptions options;
+ private final RecordComparator partitionComparator;
+ private final ChainGroupReadTable chainGroupReadTable;
+
+ public ChainTableBatchScan(
+ DataTableScan mainScan,
+ DataTableScan fallbackScan,
+ TableSchema tableSchema,
+ ChainGroupReadTable chainGroupReadTable) {
+ super(mainScan, fallbackScan);
+ this.tableSchema = tableSchema;
+ this.options = CoreOptions.fromMap(tableSchema.options());
+ this.chainGroupReadTable = chainGroupReadTable;
+ this.partitionConverter =
+ new
RowDataToObjectArrayConverter(tableSchema.logicalPartitionType());
+ this.partitionComputer =
+ new InternalRowPartitionComputer(
+ options.partitionDefaultName(),
+ tableSchema.logicalPartitionType(),
+ tableSchema.partitionKeys().toArray(new String[0]),
+ options.legacyPartitionName());
+ this.partitionComparator =
+ CodeGenUtils.newRecordComparator(
+
tableSchema.logicalPartitionType().getFieldTypes());
+ }
+
+ @Override
+ public Plan plan() {
+ List<Split> splits = new ArrayList<>();
+ Set<BinaryRow> completePartitions = new HashSet<>();
+ PredicateBuilder builder = new
PredicateBuilder(tableSchema.logicalPartitionType());
+ for (Split split : mainScan.plan().splits()) {
+ DataSplit dataSplit = (DataSplit) split;
+ HashMap<String, String> fileBucketPathMapping = new
HashMap<>();
+ HashMap<String, String> fileBranchMapping = new HashMap<>();
+ for (DataFileMeta file : dataSplit.dataFiles()) {
+ fileBucketPathMapping.put(file.fileName(), ((DataSplit)
split).bucketPath());
+ fileBranchMapping.put(file.fileName(),
options.scanFallbackSnapshotBranch());
+ }
+ splits.add(
+ new ChainSplit(
+ dataSplit.partition(),
+ dataSplit.dataFiles(),
+ fileBucketPathMapping,
+ fileBranchMapping));
+ completePartitions.add(dataSplit.partition());
+ }
+ List<BinaryRow> remainingPartitions =
+ fallbackScan.listPartitions().stream()
+ .filter(p -> !completePartitions.contains(p))
+ .collect(Collectors.toList());
+ if (!remainingPartitions.isEmpty()) {
+ fallbackScan.withPartitionFilter(remainingPartitions);
+ List<BinaryRow> deltaPartitions =
fallbackScan.listPartitions();
+ deltaPartitions =
+ deltaPartitions.stream()
+ .sorted((o1, o2) ->
partitionComparator.compare(o1, o2))
+ .collect(Collectors.toList());
+ BinaryRow maxPartition =
deltaPartitions.get(deltaPartitions.size() - 1);
+ Predicate snapshotPredicate =
+ ChainTableUtils.createTriangularPredicate(
+ maxPartition,
+ partitionConverter,
+ (Integer i, Object j) -> builder.equal(i, j),
+ (Integer i, Object j) -> builder.lessThan(i,
j));
+ mainScan.withPartitionFilter(snapshotPredicate);
+ List<BinaryRow> candidateSnapshotPartitions =
mainScan.listPartitions();
+ candidateSnapshotPartitions =
+ candidateSnapshotPartitions.stream()
+ .sorted((o1, o2) ->
partitionComparator.compare(o1, o2))
+ .collect(Collectors.toList());
+ Map<BinaryRow, BinaryRow> partitionMapping =
+ ChainTableUtils.findFirstLatestPartitions(
+ deltaPartitions, candidateSnapshotPartitions,
partitionComparator);
+ for (Map.Entry<BinaryRow, BinaryRow> partitionParis :
partitionMapping.entrySet()) {
+ DataTableScan snapshotScan =
chainGroupReadTable.newSnapshotScan();
+ DataTableScan deltaScan =
chainGroupReadTable.newDeltaScan();
+ if (partitionParis.getValue() == null) {
+ List<Predicate> predicates = new ArrayList<>();
+ predicates.add(
+ ChainTableUtils.createTriangularPredicate(
+ partitionParis.getKey(),
+ partitionConverter,
+ (Integer i, Object j) ->
builder.equal(i, j),
+ (Integer i, Object j) ->
builder.lessThan(i, j)));
+ predicates.add(
+ ChainTableUtils.createLinearPredicate(
+ partitionParis.getKey(),
+ partitionConverter,
+ (Integer i, Object j) ->
builder.equal(i, j)));
+
deltaScan.withPartitionFilter(PredicateBuilder.or(predicates));
+ } else {
+ List<BinaryRow> selectedDeltaPartitions =
+ ChainTableUtils.getDeltaPartitions(
+ partitionParis.getValue(),
+ partitionParis.getKey(),
+ tableSchema.partitionKeys(),
+ tableSchema.logicalPartitionType(),
+ options,
+ partitionComparator,
+ partitionComputer);
+ deltaScan.withPartitionFilter(selectedDeltaPartitions);
+ }
+ List<Split> subSplits = deltaScan.plan().splits();
+ Set<String> snapshotFileNames = new HashSet<>();
+ if (partitionParis.getValue() != null) {
+
snapshotScan.withPartitionFilter(Arrays.asList(partitionParis.getValue()));
+ List<Split> mainSubSplits =
snapshotScan.plan().splits();
+ snapshotFileNames =
+ mainSubSplits.stream()
+ .flatMap(
+ s ->
+ ((DataSplit) s)
+
.dataFiles().stream()
+ .map(
+
DataFileMeta
+
::fileName))
+ .collect(Collectors.toSet());
+ subSplits.addAll(mainSubSplits);
+ }
+ Map<Integer, List<DataSplit>> bucketSplits = new
LinkedHashMap<>();
+ for (Split split : subSplits) {
+ DataSplit dataSplit = (DataSplit) split;
+ Preconditions.checkArgument(
+ dataSplit.totalBuckets() == options.bucket(),
+ "Inconsistent bucket num " +
dataSplit.bucket());
+ bucketSplits
+ .computeIfAbsent(dataSplit.bucket(), k -> new
ArrayList<>())
+ .add(dataSplit);
+ }
+ for (Map.Entry<Integer, List<DataSplit>> entry :
bucketSplits.entrySet()) {
+ HashMap<String, String> fileBucketPathMapping = new
HashMap<>();
+ HashMap<String, String> fileBranchMapping = new
HashMap<>();
+ List<DataSplit> splitList = entry.getValue();
+ for (DataSplit dataSplit : splitList) {
+ for (DataFileMeta file : dataSplit.dataFiles()) {
+ fileBucketPathMapping.put(file.fileName(),
dataSplit.bucketPath());
+ String branch =
+
snapshotFileNames.contains(file.fileName())
+ ?
options.scanFallbackSnapshotBranch()
+ :
options.scanFallbackDeltaBranch();
+ fileBranchMapping.put(file.fileName(), branch);
+ }
+ }
+ ChainSplit split =
+ new ChainSplit(
+ partitionParis.getKey(),
+ entry.getValue().stream()
+ .flatMap(
+ datsSplit ->
datsSplit.dataFiles().stream())
+ .collect(Collectors.toList()),
+ fileBucketPathMapping,
+ fileBranchMapping);
+ splits.add(split);
+ }
+ }
+ }
+ return new DataFilePlan(splits);
+ }
+
+ @Override
+ public List<PartitionEntry> listPartitionEntries() {
+ return super.listPartitionEntries();
+ }
+ }
+
+ @Override
+ public InnerTableRead newRead() {
+ return new Read();
+ }
+
+ private class Read implements InnerTableRead {
+
+ private final InnerTableRead mainRead;
+ private final InnerTableRead fallbackRead;
+
+ private Read() {
+ this.mainRead = wrapped.newRead();
+ this.fallbackRead = fallback().newRead();
+ }
+
+ @Override
+ public InnerTableRead withFilter(Predicate predicate) {
+ mainRead.withFilter(predicate);
+ fallbackRead.withFilter(predicate);
+ return this;
+ }
+
+ @Override
+ public InnerTableRead withReadType(RowType readType) {
+ mainRead.withReadType(readType);
+ fallbackRead.withReadType(readType);
+ return this;
+ }
+
+ @Override
+ public InnerTableRead forceKeepDelete() {
+ mainRead.forceKeepDelete();
+ fallbackRead.forceKeepDelete();
+ return this;
+ }
+
+ @Override
+ public TableRead executeFilter() {
+ mainRead.executeFilter();
+ fallbackRead.executeFilter();
+ return this;
+ }
+
+ @Override
+ public TableRead withIOManager(IOManager ioManager) {
+ mainRead.withIOManager(ioManager);
+ fallbackRead.withIOManager(ioManager);
+ return this;
+ }
+
+ @Override
+ public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ Preconditions.checkArgument(split instanceof ChainSplit);
+ return fallbackRead.createReader(split);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 11bc764a57..620638cb92 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -37,6 +37,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.TopN;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.source.ChainSplit;
import org.apache.paimon.table.source.DataFilePlan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
@@ -82,7 +83,12 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
this.fallback = fallback;
Preconditions.checkArgument(!(wrapped instanceof
FallbackReadFileStoreTable));
- Preconditions.checkArgument(!(fallback instanceof
FallbackReadFileStoreTable));
+ Preconditions.checkArgument(isValidFallbackTable());
+ }
+
+ private boolean isValidFallbackTable() {
+ return fallback instanceof ChainGroupReadTable
+ || !(fallback instanceof FallbackReadFileStoreTable);
}
public FileStoreTable fallback() {
@@ -128,7 +134,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
fallback.setManifestCache(manifestCache);
}
- private FileStoreTable switchWrappedToBranch(String branchName) {
+ protected FileStoreTable switchWrappedToBranch(String branchName) {
Optional<TableSchema> optionalSchema =
wrapped.schemaManager().copyWithBranch(branchName).latest();
Preconditions.checkArgument(
@@ -146,7 +152,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
wrapped.catalogEnvironment());
}
- private Map<String, String> rewriteFallbackOptions(Map<String, String>
options) {
+ protected Map<String, String> rewriteFallbackOptions(Map<String, String>
options) {
Map<String, String> result = new HashMap<>(options);
// branch of fallback table should never change
@@ -186,7 +192,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
return new FallbackReadScan(wrapped.newScan(), fallback.newScan());
}
- private void validateSchema() {
+ protected void validateSchema() {
String mainBranch = wrapped.coreOptions().branch();
String fallbackBranch = fallback.coreOptions().branch();
RowType mainRowType = wrapped.schema().logicalRowType();
@@ -289,8 +295,8 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
/** Scan implementation for {@link FallbackReadFileStoreTable}. */
public static class FallbackReadScan implements DataTableScan {
- private final DataTableScan mainScan;
- private final DataTableScan fallbackScan;
+ protected final DataTableScan mainScan;
+ protected final DataTableScan fallbackScan;
public FallbackReadScan(DataTableScan mainScan, DataTableScan
fallbackScan) {
this.mainScan = mainScan;
@@ -383,7 +389,7 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
@Override
public TableScan.Plan plan() {
- List<DataSplit> splits = new ArrayList<>();
+ List<Split> splits = new ArrayList<>();
Set<BinaryRow> completePartitions = new HashSet<>();
for (Split split : mainScan.plan().splits()) {
DataSplit dataSplit = (DataSplit) split;
@@ -398,7 +404,12 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
if (!remainingPartitions.isEmpty()) {
fallbackScan.withPartitionFilter(remainingPartitions);
for (Split split : fallbackScan.plan().splits()) {
- splits.add(new FallbackDataSplit((DataSplit) split, true));
+ if (split instanceof DataSplit) {
+ splits.add(new FallbackDataSplit((DataSplit) split,
true));
+ } else {
+ Preconditions.checkArgument(split instanceof
ChainSplit);
+ splits.add(split);
+ }
}
}
return new DataFilePlan(splits);
@@ -472,6 +483,13 @@ public class FallbackReadFileStoreTable extends
DelegatedFileStoreTable {
@Override
public RecordReader<InternalRow> createReader(Split split) throws
IOException {
+ if (wrapped.coreOptions().isChainTable()) {
+ if (split instanceof ChainSplit) {
+ return fallbackRead.createReader(split);
+ } else {
+ return mainRead.createReader(split);
+ }
+ }
if (split instanceof FallbackDataSplit) {
FallbackDataSplit fallbackDataSplit = (FallbackDataSplit)
split;
if (fallbackDataSplit.isFallback) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 06fc9b3020..73ae6d2dd1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -26,6 +26,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.utils.ChainTableUtils;
import org.apache.paimon.utils.StringUtils;
import org.slf4j.Logger;
@@ -98,7 +99,9 @@ public class FileStoreTableFactory {
Options options = new Options(table.options());
String fallbackBranch = options.get(CoreOptions.SCAN_FALLBACK_BRANCH);
- if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
+ if (ChainTableUtils.isChainTable(options.toMap())) {
+ table = createChainTable(table, fileIO, tablePath, dynamicOptions,
catalogEnvironment);
+ } else if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
Options branchOptions = new Options(dynamicOptions.toMap());
branchOptions.set(CoreOptions.BRANCH, fallbackBranch);
Optional<TableSchema> schema =
@@ -130,6 +133,65 @@ public class FileStoreTableFactory {
return table;
}
+ public static FileStoreTable createChainTable(
+ FileStoreTable table,
+ FileIO fileIO,
+ Path tablePath,
+ Options dynamicOptions,
+ CatalogEnvironment catalogEnvironment) {
+ String scanFallbackSnapshotBranch =
+
table.options().get(CoreOptions.SCAN_FALLBACK_SNAPSHOT_BRANCH.key());
+ String scanFallbackDeltaBranch =
+
table.options().get(CoreOptions.SCAN_FALLBACK_DELTA_BRANCH.key());
+ String currentBranch =
table.schema().options().get(CoreOptions.BRANCH.key());
+ if (scanFallbackSnapshotBranch == null || scanFallbackDeltaBranch ==
null) {
+ return table;
+ }
+
+ boolean scanSnapshotBranch =
scanFallbackSnapshotBranch.equalsIgnoreCase(currentBranch);
+ boolean scanDeltaBranch =
scanFallbackDeltaBranch.equalsIgnoreCase(currentBranch);
+ LOG.info(
+ "Create chain table, tbl path {}, snapshotBranch {},
deltaBranch{}, currentBranch {} "
+ + "scanSnapshotBranch{} scanDeltaBranch {}.",
+ tablePath,
+ scanFallbackSnapshotBranch,
+ scanFallbackDeltaBranch,
+ currentBranch,
+ scanSnapshotBranch,
+ scanDeltaBranch);
+ if (scanSnapshotBranch || scanDeltaBranch) {
+ return table;
+ }
+
+ Options snapshotBranchOptions = new Options(dynamicOptions.toMap());
+ snapshotBranchOptions.set(CoreOptions.BRANCH,
scanFallbackSnapshotBranch);
+ Optional<TableSchema> snapshotSchema =
+ new SchemaManager(fileIO, tablePath,
scanFallbackSnapshotBranch).latest();
+ AbstractFileStoreTable snapshotTable =
+ (AbstractFileStoreTable)
+ createWithoutFallbackBranch(
+ fileIO,
+ tablePath,
+ snapshotSchema.get(),
+ snapshotBranchOptions,
+ catalogEnvironment);
+ Options deltaBranchOptions = new Options(dynamicOptions.toMap());
+ deltaBranchOptions.set(CoreOptions.BRANCH, scanFallbackDeltaBranch);
+ Optional<TableSchema> deltaSchema =
+ new SchemaManager(fileIO, tablePath,
scanFallbackDeltaBranch).latest();
+ AbstractFileStoreTable deltaTable =
+ (AbstractFileStoreTable)
+ createWithoutFallbackBranch(
+ fileIO,
+ tablePath,
+ deltaSchema.get(),
+ deltaBranchOptions,
+ catalogEnvironment);
+ FileStoreTable chainGroupFileStoreTable =
+ new ChainGroupReadTable(snapshotTable, deltaTable);
+ return new FallbackReadFileStoreTable(table, chainGroupFileStoreTable);
+ }
+
public static FileStoreTable createWithoutFallbackBranch(
FileIO fileIO,
Path tablePath,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 100c696f4b..dcadfad1ff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -148,6 +148,12 @@ abstract class AbstractDataTableScan implements
DataTableScan {
return this;
}
+ @Override
+ public InnerTableScan withPartitionFilter(Predicate predicate) {
+ snapshotReader.withPartitionFilter(predicate);
+ return this;
+ }
+
@Override
public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
snapshotReader.withLevelFilter(levelFilter);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index dccdfcef3d..bc3d0f275f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -66,6 +66,10 @@ public interface InnerTableScan extends TableScan {
return this;
}
+ default InnerTableScan withPartitionFilter(Predicate predicate) {
+ return this;
+ }
+
default InnerTableScan withRowRanges(List<Range> rowRanges) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
new file mode 100644
index 0000000000..5d3d84692e
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
+import org.apache.paimon.partition.PartitionTimeExtractor;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.RowType;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/** Utils for chain table. */
+public class ChainTableUtils {
+
+ public static boolean isChainTable(Map<String, String> tblOptions) {
+ return CoreOptions.fromMap(tblOptions).isChainTable();
+ }
+
+ public static Map<BinaryRow, BinaryRow> findFirstLatestPartitions(
+ List<BinaryRow> sortedSourcePartitions,
+ List<BinaryRow> sortedTargetPartitions,
+ RecordComparator partitionComparator) {
+ Map<BinaryRow, BinaryRow> partitionMapping = new HashMap<>();
+ int targetIndex = 0;
+ for (BinaryRow sourceRow : sortedSourcePartitions) {
+ BinaryRow firstSmaller;
+ while (targetIndex < sortedTargetPartitions.size()
+ && partitionComparator.compare(
+ sortedTargetPartitions.get(targetIndex),
sourceRow)
+ < 0) {
+ targetIndex++;
+ }
+ firstSmaller = (targetIndex > 0) ?
sortedTargetPartitions.get(targetIndex - 1) : null;
+ partitionMapping.put(sourceRow, firstSmaller);
+ }
+ return partitionMapping;
+ }
+
+ public static List<BinaryRow> getDeltaPartitions(
+ BinaryRow beginPartition,
+ BinaryRow endPartition,
+ List<String> partitionColumns,
+ RowType partType,
+ CoreOptions options,
+ RecordComparator partitionComparator,
+ InternalRowPartitionComputer partitionComputer) {
+ InternalRowSerializer serializer = new InternalRowSerializer(partType);
+ List<BinaryRow> deltaPartitions = new ArrayList<>();
+ boolean isDailyPartition = partitionColumns.size() == 1;
+ List<String> startPartitionValues =
+ new
ArrayList<>(partitionComputer.generatePartValues(beginPartition).values());
+ List<String> endPartitionValues =
+ new
ArrayList<>(partitionComputer.generatePartValues(endPartition).values());
+ PartitionTimeExtractor timeExtractor =
+ new PartitionTimeExtractor(
+ options.partitionTimestampPattern(),
options.partitionTimestampFormatter());
+ LocalDateTime stratPartitionTime =
+ timeExtractor.extract(partitionColumns, startPartitionValues);
+ LocalDateTime candidateTime = stratPartitionTime;
+ LocalDateTime endPartitionTime =
+ timeExtractor.extract(partitionColumns, endPartitionValues);
+ while (candidateTime.compareTo(endPartitionTime) <= 0) {
+ if (isDailyPartition) {
+ if (candidateTime.compareTo(stratPartitionTime) > 0) {
+ deltaPartitions.add(
+ serializer
+ .toBinaryRow(
+
InternalRowPartitionComputer.convertSpecToInternalRow(
+ calPartValues(
+ candidateTime,
+ partitionColumns,
+
options.partitionTimestampPattern(),
+
options.partitionTimestampFormatter()),
+ partType,
+
options.partitionDefaultName()))
+ .copy());
+ }
+ } else {
+ for (int hour = 0; hour <= 23; hour++) {
+ candidateTime =
candidateTime.toLocalDate().atStartOfDay().plusHours(hour);
+ BinaryRow candidatePartition =
+ serializer
+ .toBinaryRow(
+
InternalRowPartitionComputer.convertSpecToInternalRow(
+ calPartValues(
+ candidateTime,
+ partitionColumns,
+
options.partitionTimestampPattern(),
+
options.partitionTimestampFormatter()),
+ partType,
+
options.partitionDefaultName()))
+ .copy();
+ if (partitionComparator.compare(candidatePartition,
beginPartition) > 0
+ && partitionComparator.compare(candidatePartition,
endPartition) <= 0) {
+ deltaPartitions.add(candidatePartition);
+ }
+ }
+ }
+ candidateTime =
candidateTime.toLocalDate().plusDays(1).atStartOfDay();
+ }
+ return deltaPartitions;
+ }
+
+ public static Predicate createTriangularPredicate(
+ BinaryRow binaryRow,
+ RowDataToObjectArrayConverter converter,
+ BiFunction<Integer, Object, Predicate> innerFunc,
+ BiFunction<Integer, Object, Predicate> outerFunc) {
+ List<Predicate> fieldPredicates = new ArrayList<>();
+ Object[] partitionObjects = converter.convert(binaryRow);
+ for (int i = 0; i < converter.getArity(); i++) {
+ List<Predicate> andConditions = new ArrayList<>();
+ for (int j = 0; j < i; j++) {
+ Object o = partitionObjects[j];
+ andConditions.add(innerFunc.apply(j, o));
+ }
+ Object currentValue = partitionObjects[i];
+ andConditions.add(outerFunc.apply(i, currentValue));
+ fieldPredicates.add(PredicateBuilder.and(andConditions));
+ }
+ return PredicateBuilder.or(fieldPredicates);
+ }
+
+ public static Predicate createLinearPredicate(
+ BinaryRow binaryRow,
+ RowDataToObjectArrayConverter converter,
+ BiFunction<Integer, Object, Predicate> func) {
+ List<Predicate> fieldPredicates = new ArrayList<>();
+ Object[] partitionObjects = converter.convert(binaryRow);
+ for (int i = 0; i < converter.getArity(); i++) {
+ fieldPredicates.add(func.apply(i, partitionObjects[i]));
+ }
+ return PredicateBuilder.and(fieldPredicates);
+ }
+
+ public static LinkedHashMap<String, String> calPartValues(
+ LocalDateTime dateTime,
+ List<String> partitionKeys,
+ String timestampPattern,
+ String timestampFormatter) {
+ DateTimeFormatter formatter =
DateTimeFormatter.ofPattern(timestampFormatter);
+ String formattedDateTime = dateTime.format(formatter);
+ Pattern keyPattern = Pattern.compile("\\$(\\w+)");
+ Matcher keyMatcher = keyPattern.matcher(timestampPattern);
+ List<String> keyOrder = new ArrayList<>();
+ StringBuilder regexBuilder = new StringBuilder();
+ int lastPosition = 0;
+ while (keyMatcher.find()) {
+ regexBuilder.append(
+ Pattern.quote(timestampPattern.substring(lastPosition,
keyMatcher.start())));
+ regexBuilder.append("(.+)");
+ keyOrder.add(keyMatcher.group(1));
+ lastPosition = keyMatcher.end();
+ }
+
regexBuilder.append(Pattern.quote(timestampPattern.substring(lastPosition)));
+
+ Matcher valueMatcher =
Pattern.compile(regexBuilder.toString()).matcher(formattedDateTime);
+ if (!valueMatcher.matches() || valueMatcher.groupCount() !=
keyOrder.size()) {
+ throw new IllegalArgumentException(
+ "Formatted datetime does not match timestamp pattern");
+ }
+
+ Map<String, String> keyValues = new HashMap<>();
+ for (int i = 0; i < keyOrder.size(); i++) {
+ keyValues.put(keyOrder.get(i), valueMatcher.group(i + 1));
+ }
+ List<String> values =
+ partitionKeys.stream()
+ .map(key -> keyValues.getOrDefault(key, ""))
+ .collect(Collectors.toList());
+ LinkedHashMap<String, String> res = new LinkedHashMap<>();
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ res.put(partitionKeys.get(i), values.get(i));
+ }
+ return res;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
new file mode 100644
index 0000000000..99ba22c451
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ChainTableUtilsTest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Test class for {@link org.apache.paimon.utils.ChainTableUtils}. */
+public class ChainTableUtilsTest {
+
+ public static final RecordComparator KEY_COMPARATOR =
+ (a, b) -> a.getString(0).compareTo(b.getString(0));
+
+ private static String partString(BinaryRow partition,
InternalRow.FieldGetter[] getters) {
+ StringBuilder builder = new StringBuilder();
+ for (InternalRow.FieldGetter getter : getters) {
+ builder.append(getter.getFieldOrNull(partition));
+ }
+ return builder.toString();
+ }
+
+ @Test
+ public void testFindFirstLatestPartitions() {
+ String[] snapshotPartitions = {"20251105", "20251101"};
+ String[] deltaPartitions = {"20251102", "20251106"};
+
+ RowType rowType = RowType.builder().field("dt",
DataTypes.STRING().notNull()).build();
+
+ BinaryRow snapshotPartition1 = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(snapshotPartition1);
+ writer.writeString(0, BinaryString.fromString(snapshotPartitions[0]));
+ InternalRow.FieldGetter[] fieldGetters =
+ InternalRowUtils.createFieldGetters(rowType.getFieldTypes());
+
+ BinaryRow snapshotPartition2 = new BinaryRow(1);
+ writer = new BinaryRowWriter(snapshotPartition2);
+ writer.writeString(0, BinaryString.fromString(snapshotPartitions[1]));
+ List<BinaryRow> sourcePartitions =
+ Stream.of(snapshotPartition1, snapshotPartition2)
+ .sorted(KEY_COMPARATOR)
+ .collect(Collectors.toList());
+
+ BinaryRow deltaPartition1 = new BinaryRow(1);
+ writer = new BinaryRowWriter(deltaPartition1);
+ writer.writeString(0, BinaryString.fromString(deltaPartitions[0]));
+
+ BinaryRow deltaPartition2 = new BinaryRow(1);
+ writer = new BinaryRowWriter(deltaPartition2);
+ writer.writeString(0, BinaryString.fromString(deltaPartitions[1]));
+ List<BinaryRow> targetPartitions =
+ Stream.of(deltaPartition1, deltaPartition2)
+ .sorted(KEY_COMPARATOR)
+ .collect(Collectors.toList());
+
+ Map<BinaryRow, BinaryRow> firstLatestPartitions =
+ ChainTableUtils.findFirstLatestPartitions(
+ targetPartitions, sourcePartitions, KEY_COMPARATOR);
+
+ BinaryRow key1 = firstLatestPartitions.get(deltaPartition1);
+ Assertions.assertEquals(partString(key1, fieldGetters),
snapshotPartitions[1]);
+
+ BinaryRow key2 = firstLatestPartitions.get(deltaPartition2);
+ Assertions.assertEquals(partString(key2, fieldGetters),
snapshotPartitions[0]);
+ }
+
+ @Test
+ public void testCreateTriangularPredicate() {
+ RowType rowType =
+ RowType.builder()
+ .field("dt", DataTypes.STRING().notNull())
+ .field("hour", DataTypes.STRING().notNull())
+ .build();
+ BinaryRow partitionValue = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(partitionValue);
+ writer.writeString(0, BinaryString.fromString("20250810"));
+ writer.writeString(1, BinaryString.fromString("23"));
+ RowDataToObjectArrayConverter partitionConverter =
+ new RowDataToObjectArrayConverter(rowType);
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ ChainTableUtils.createTriangularPredicate(
+ partitionValue,
+ partitionConverter,
+ (Integer i, Object j) -> builder.equal(i, j),
+ (Integer i, Object j) -> builder.lessThan(i, j));
+ Assertions.assertTrue(!predicate.test(partitionValue));
+ List<Predicate> predicates = new ArrayList<>();
+ predicates.add(builder.lessThan(0, partitionValue.getString(0)));
+ List<Predicate> subPredicates = new ArrayList<>();
+ subPredicates.add(builder.equal(0, partitionValue.getString(0)));
+ subPredicates.add(builder.lessThan(1, partitionValue.getString(1)));
+ predicates.add(PredicateBuilder.and(subPredicates));
+ Predicate expected = PredicateBuilder.or(predicates);
+ Assertions.assertTrue(predicate.equals(expected));
+ }
+
+ @Test
+ public void testCreateLinearPredicate() {
+ RowType rowType =
+ RowType.builder()
+ .field("dt", DataTypes.STRING().notNull())
+ .field("hour", DataTypes.STRING().notNull())
+ .build();
+ BinaryRow partitionValue = new BinaryRow(2);
+ BinaryRowWriter writer = new BinaryRowWriter(partitionValue);
+ writer.writeString(0, BinaryString.fromString("20250810"));
+ writer.writeString(1, BinaryString.fromString("23"));
+ RowDataToObjectArrayConverter partitionConverter =
+ new RowDataToObjectArrayConverter(rowType);
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ ChainTableUtils.createLinearPredicate(
+ partitionValue,
+ partitionConverter,
+ (Integer i, Object j) -> builder.equal(i, j));
+ Assertions.assertTrue(predicate.test(partitionValue));
+ List<Predicate> predicates = new ArrayList<>();
+ predicates.add(builder.equal(0, partitionValue.getString(0)));
+ predicates.add(builder.equal(1, partitionValue.getString(1)));
+ Predicate expected = PredicateBuilder.and(predicates);
+ Assertions.assertTrue(predicate.equals(expected));
+ }
+
+ @Test
+ public void testGeneratePartitionValues() {
+ LinkedHashMap<String, String> partitionValues =
+ ChainTableUtils.calPartValues(
+ LocalDateTime.of(2023, 1, 1, 12, 0, 0),
+ Arrays.asList("dt", "hour"),
+ "$dt $hour:00:00",
+ "yyyyMMdd HH:mm:ss");
+ assertEquals(
+ new LinkedHashMap<String, String>() {
+ {
+ put("dt", "20230101");
+ put("hour", "12");
+ }
+ },
+ partitionValues);
+
+ partitionValues =
+ ChainTableUtils.calPartValues(
+ LocalDateTime.of(2023, 1, 1, 0, 0, 0),
+ Arrays.asList("dt"),
+ "$dt",
+ "yyyyMMdd");
+ assertEquals(
+ new LinkedHashMap<String, String>() {
+ {
+ put("dt", "20230101");
+ }
+ },
+ partitionValues);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 583522822b..aaaaec1854 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -174,4 +174,428 @@ public class SparkCatalogWithHiveTest {
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
.master("local[2]");
}
+
+ @Test
+ public void testChainTable(@TempDir java.nio.file.Path tempDir) throws
IOException {
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession.Builder builder =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir",
warehousePath.toString())
+ // with hive metastore
+ .config("spark.sql.catalogImplementation", "hive")
+ .config("hive.metastore.uris", "thrift://localhost:" +
PORT)
+ .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.metastore",
"hive")
+ .config(
+
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
+ "thrift://localhost:" + PORT)
+
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
+ .config(
+ "spark.sql.catalog.spark_catalog.warehouse",
+ warehousePath.toString())
+ .config(
+ "spark.sql.extensions",
+
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .master("local[2]");
+ SparkSession spark = builder.getOrCreate();
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE spark_catalog.my_db1");
+
+ /** Create table */
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS \n"
+ + " `my_db1`.`chain_test` (\n"
+ + " `t1` BIGINT COMMENT 't1',\n"
+ + " `t2` BIGINT COMMENT 't2',\n"
+ + " `t3` STRING COMMENT 't3'\n"
+ + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
+ + "WITH\n"
+ + " SERDEPROPERTIES ('serialization.format' = '1')
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat'
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES
(\n"
+ + " 'bucket-key' = 't1',\n"
+ + " 'primary-key' = 'dt,t1',\n"
+ + " 'partition.timestamp-pattern' = '$dt',\n"
+ + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n"
+ + " 'chain-table.enabled' = 'true',\n"
+ + " 'bucket' = '2',\n"
+ + " 'merge-engine' = 'deduplicate', \n"
+ + " 'sequence.field' = 't2'\n"
+ + " )");
+
+ /** Create branch */
+ spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
+ spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
+
+ /** Set branch */
+ spark.sql(
+ "ALTER TABLE my_db1.chain_test SET tblproperties ("
+ + "'scan.fallback-snapshot-branch' = 'snapshot', "
+ + "'scan.fallback-delta-branch' = 'delta')");
+ spark.sql(
+ "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET
tblproperties ("
+ + "'scan.fallback-snapshot-branch' = 'snapshot',"
+ + "'scan.fallback-delta-branch' = 'delta')");
+ spark.sql(
+ "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET
tblproperties ("
+ + "'scan.fallback-snapshot-branch' = 'snapshot',"
+ + "'scan.fallback-delta-branch' = 'delta')");
+ spark.close();
+ spark = builder.getOrCreate();
+
+ /** Write main branch */
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810') values (1, 1, '1'),(2, 1, '1');");
+
+ /** Write delta branch */
+ spark.sql("set spark.paimon.branch=delta;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250809') values (1, 1, '1'),(2, 1, '1');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810') values (1, 2, '1-1' ),(3, 1, '1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811') values (2, 2, '1-1' ),(4, 1, '1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250812') values (3, 2, '1-1' ),(4, 2, '1-1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250813') values (5, 1, '1' ),(6, 1, '1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250814') values (5, 2, '1-1' ),(6, 2, '1-1' );");
+
+ /** Write snapshot branch */
+ spark.sql("set spark.paimon.branch=snapshot;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt
= '20250810') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250812') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250814') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'),
(5, 1, '1' ), (6, 1, '1');");
+
+ spark.close();
+ spark = builder.getOrCreate();
+ /** Main read */
+ assertThat(
+ spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250810'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,1,1,20250810]",
"[2,1,1,20250810]");
+
+ /** Snapshot read */
+ assertThat(
+ spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250814'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,2,1-1,20250814]",
+ "[2,2,1-1,20250814]",
+ "[3,2,1-1,20250814]",
+ "[4,2,1-1,20250814]",
+ "[5,1,1,20250814]",
+ "[6,1,1,20250814]");
+
+ /** Chain read */
+ /** 1. non pre snapshot */
+ assertThat(
+ spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250809'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,1,1,20250809]",
"[2,1,1,20250809]");
+ /** 2. has pre snapshot */
+ assertThat(
+ spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250811'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,2,1-1,20250811]",
+ "[2,2,1-1,20250811]",
+ "[3,1,1,20250811]",
+ "[4,1,1,20250811]");
+
+ /** Multi partition Read */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt in ('20250810', '20250811', '20250812');")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,1,1,20250810]",
+ "[2,1,1,20250810]",
+ "[1,2,1-1,20250811]",
+ "[2,2,1-1,20250811]",
+ "[3,1,1,20250811]",
+ "[4,1,1,20250811]",
+ "[1,2,1-1,20250812]",
+ "[2,2,1-1,20250812]",
+ "[3,2,1-1,20250812]",
+ "[4,2,1-1,20250812]");
+
+ /** Incremental read */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250811'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[2,2,1-1,20250811]",
"[4,1,1,20250811]");
+
+ /** Multi partition incremental read */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test$branch_delta` where dt in ('20250810', '20250811',
'20250812');")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,2,1-1,20250810]",
+ "[3,1,1,20250810]",
+ "[2,2,1-1,20250811]",
+ "[4,1,1,20250811]",
+ "[3,2,1-1,20250812]",
+ "[4,2,1-1,20250812]");
+
+ /** Hybrid read */
+ assertThat(
+ spark
+ .sql(
+ "select * from `my_db1`.`chain_test`
where dt = '20250811'\n"
+ + "union all\n"
+ + "select * from
`my_db1`.`chain_test$branch_delta` where dt = '20250811'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,2,1-1,20250811]",
+ "[2,2,1-1,20250811]",
+ "[3,1,1,20250811]",
+ "[4,1,1,20250811]",
+ "[2,2,1-1,20250811]",
+ "[4,1,1,20250811]");
+
+ spark.close();
+ spark = builder.getOrCreate();
+
+ /** Drop table */
+ spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+
+ spark.close();
+ }
+
+ @Test
+ public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir)
throws IOException {
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession.Builder builder =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir",
warehousePath.toString())
+ // with hive metastore
+ .config("spark.sql.catalogImplementation", "hive")
+ .config("hive.metastore.uris", "thrift://localhost:" +
PORT)
+ .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.metastore",
"hive")
+ .config(
+
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
+ "thrift://localhost:" + PORT)
+
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
+ .config(
+ "spark.sql.catalog.spark_catalog.warehouse",
+ warehousePath.toString())
+ .config(
+ "spark.sql.extensions",
+
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .master("local[2]");
+ SparkSession spark = builder.getOrCreate();
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE spark_catalog.my_db1");
+
+ /** Create table */
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS \n"
+ + " `my_db1`.`chain_test` (\n"
+ + " `t1` BIGINT COMMENT 't1',\n"
+ + " `t2` BIGINT COMMENT 't2',\n"
+ + " `t3` STRING COMMENT 't3'\n"
+ + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt',
`hour` STRING COMMENT 'hour') ROW FORMAT SERDE
'org.apache.paimon.hive.PaimonSerDe'\n"
+ + "WITH\n"
+ + " SERDEPROPERTIES ('serialization.format' = '1')
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat'
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES
(\n"
+ + " 'bucket-key' = 't1',\n"
+ + " 'primary-key' = 'dt,hour,t1',\n"
+ + " 'partition.timestamp-pattern' = '$dt
$hour:00:00',\n"
+ + " 'partition.timestamp-formatter' = 'yyyyMMdd
HH:mm:ss',\n"
+ + " 'chain-table.enabled' = 'true',\n"
+ + " 'bucket' = '2',\n"
+ + " 'merge-engine' = 'deduplicate', \n"
+ + " 'sequence.field' = 't2'\n"
+ + " )");
+
+ /** Create branch */
+ spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
+ spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
+
+ /** Set branch */
+ spark.sql(
+ "ALTER TABLE my_db1.chain_test SET tblproperties ("
+ + "'scan.fallback-snapshot-branch' = 'snapshot', "
+ + "'scan.fallback-delta-branch' = 'delta')");
+ spark.sql(
+ "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET
tblproperties ("
+ + "'scan.fallback-snapshot-branch' = 'snapshot',"
+ + "'scan.fallback-delta-branch' = 'delta')");
+ spark.sql(
+ "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET
tblproperties ("
+ + "'scan.fallback-snapshot-branch' = 'snapshot',"
+ + "'scan.fallback-delta-branch' = 'delta')");
+ spark.close();
+ spark = builder.getOrCreate();
+
+ /** Write main branch */
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810', hour = '22') values (1, 1, '1'),(2, 1, '1');");
+
+ /** Write delta branch */
+ spark.sql("set spark.paimon.branch=delta;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810', hour = '21') values (1, 1, '1'),(2, 1, '1');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810', hour = '22') values (1, 2, '1-1' ),(3, 1, '1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810', hour = '23') values (2, 2, '1-1' ),(4, 1, '1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '00') values (3, 2, '1-1' ),(4, 2, '1-1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '01') values (5, 1, '1' ),(6, 1, '1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '02') values (5, 2, '1-1' ),(6, 2, '1-1' );");
+
+ /** Write snapshot branch */
+ spark.sql("set spark.paimon.branch=snapshot;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt
= '20250810', hour = '22') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4,
2, '1-1');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4,
2, '1-1'), (5, 1, '1' ), (6, 1, '1');");
+
+ spark.close();
+ spark = builder.getOrCreate();
+ /** Main read */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250810' and hour = '22'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,1,1,20250810,22]",
"[2,1,1,20250810,22]");
+
+ /** Snapshot read */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250811' and hour = '02'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,2,1-1,20250811,02]",
+ "[2,2,1-1,20250811,02]",
+ "[3,2,1-1,20250811,02]",
+ "[4,2,1-1,20250811,02]",
+ "[5,1,1,20250811,02]",
+ "[6,1,1,20250811,02]");
+
+ /** Chain read */
+ /** 1. non pre snapshot */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250810' and hour = '21'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[1,1,1,20250810,21]",
"[2,1,1,20250810,21]");
+ /** 2. has pre snapshot */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250810' and hour = '23'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,2,1-1,20250810,23]",
+ "[2,2,1-1,20250810,23]",
+ "[3,1,1,20250810,23]",
+ "[4,1,1,20250810,23]");
+
+ /** Multi partition Read */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250810' and hour in ('22', '23');")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,1,1,20250810,22]",
+ "[2,1,1,20250810,22]",
+ "[1,2,1-1,20250810,23]",
+ "[2,2,1-1,20250810,23]",
+ "[3,1,1,20250810,23]",
+ "[4,1,1,20250810,23]");
+
+ /** Incremental read */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("[2,2,1-1,20250810,23]",
"[4,1,1,20250810,23]");
+
+ /** Multi partition incremental read */
+ assertThat(
+ spark
+ .sql(
+ "SELECT * FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour in ('22',
'23');")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,2,1-1,20250810,22]",
+ "[3,1,1,20250810,22]",
+ "[2,2,1-1,20250810,23]",
+ "[4,1,1,20250810,23]");
+
+ /** Hybrid read */
+ assertThat(
+ spark
+ .sql(
+ "select * from `my_db1`.`chain_test`
where dt = '20250810' and hour = '23'\n"
+ + "union all\n"
+ + "select * from
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'")
+ .collectAsList().stream()
+ .map(Row::toString)
+ .collect(Collectors.toList()))
+ .containsExactlyInAnyOrder(
+ "[1,2,1-1,20250810,23]",
+ "[2,2,1-1,20250810,23]",
+ "[3,1,1,20250810,23]",
+ "[4,1,1,20250810,23]",
+ "[2,2,1-1,20250810,23]",
+ "[4,1,1,20250810,23]");
+
+ spark.close();
+ spark = builder.getOrCreate();
+
+ /** Drop table */
+ spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
+
+ spark.close();
+ }
}