This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 84a552b5eb [core] Support re-overwrite by spark for chain table (#7038)
84a552b5eb is described below
commit 84a552b5eb41584cd479ca99b6c4b6a7794c3d9a
Author: Stefanietry <[email protected]>
AuthorDate: Mon Jan 19 16:35:24 2026 +0800
[core] Support re-overwrite by spark for chain table (#7038)
---
docs/static/img/chain-table.png | Bin 172663 -> 172923 bytes
.../java/org/apache/paimon/AbstractFileStore.java | 5 +
.../ChainTableOverwriteCommitCallback.java | 135 +++++++
.../org/apache/paimon/utils/ChainTableUtils.java | 5 +
.../paimon/spark/SparkCatalogWithHiveTest.java | 424 ---------------------
...ithHiveTest.java => SparkChainTableITCase.java} | 162 ++------
6 files changed, 180 insertions(+), 551 deletions(-)
diff --git a/docs/static/img/chain-table.png b/docs/static/img/chain-table.png
index 19fbc06685..c55e9e5e34 100644
Binary files a/docs/static/img/chain-table.png and
b/docs/static/img/chain-table.png differ
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 4f8bfb68e9..edf7e16294 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -36,6 +36,7 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metastore.AddPartitionCommitCallback;
import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
import org.apache.paimon.metastore.TagPreviewCommitCallback;
import org.apache.paimon.operation.ChangelogDeletion;
import org.apache.paimon.operation.FileStoreCommitImpl;
@@ -418,6 +419,10 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
callbacks.add(new IcebergCommitCallback(table, commitUser));
}
+ if (options.isChainTable()) {
+ callbacks.add(new ChainTableOverwriteCommitCallback(table));
+ }
+
callbacks.addAll(CallbackUtils.loadCommitCallbacks(options, table));
return callbacks;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
new file mode 100644
index 0000000000..76defb1ed0
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.table.ChainGroupReadTable;
+import org.apache.paimon.table.FallbackReadFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link CommitCallback} implementation to maintain chain table snapshot
branch for overwrite
+ * commits.
+ *
+ * <p>When the following conditions are met, this callback will truncate the
corresponding
+ * partitions on the snapshot branch:
+ *
+ * <ul>
+ * <li>The committed snapshot kind is {@link CommitKind#OVERWRITE};
+ * <li>The table is a chain table and current branch is delta branch.
+ * </ul>
+ *
+ * <p>This callback is designed to be idempotent. It may be invoked multiple
times for the same
+ * logical commit, but truncating the same partitions on the snapshot branch
repeatedly is safe.
+ */
+public class ChainTableOverwriteCommitCallback implements CommitCallback {
+
+ private transient FileStoreTable table;
+ private transient CoreOptions coreOptions;
+
+ public ChainTableOverwriteCommitCallback(FileStoreTable table) {
+ this.table = table;
+ this.coreOptions = table.coreOptions();
+ }
+
+ @Override
+ public void call(
+ List<SimpleFileEntry> baseFiles,
+ List<ManifestEntry> deltaFiles,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot) {
+
+ if (!ChainTableUtils.isScanFallbackDeltaBranch(coreOptions)) {
+ return;
+ }
+
+ if (snapshot.commitKind() != CommitKind.OVERWRITE) {
+ return;
+ }
+
+ // Find the underlying table for writing snapshot branch.
+ FileStoreTable candidateTable = table;
+ if (table instanceof FallbackReadFileStoreTable) {
+ candidateTable =
+ ((ChainGroupReadTable) ((FallbackReadFileStoreTable)
table).fallback())
+ .wrapped();
+ }
+
+ FileStoreTable snapshotTable =
+
candidateTable.switchToBranch(coreOptions.scanFallbackSnapshotBranch());
+
+ InternalRowPartitionComputer partitionComputer =
+ new InternalRowPartitionComputer(
+ coreOptions.partitionDefaultName(),
+ table.schema().logicalPartitionType(),
+ table.schema().partitionKeys().toArray(new String[0]),
+ coreOptions.legacyPartitionName());
+
+ List<BinaryRow> overwritePartitions =
+ deltaFiles.stream()
+ .map(ManifestEntry::partition)
+ .distinct()
+ .collect(Collectors.toList());
+
+ if (overwritePartitions.isEmpty()) {
+ return;
+ }
+
+ List<Map<String, String>> candidatePartitions =
+ overwritePartitions.stream()
+ .map(partitionComputer::generatePartValues)
+ .collect(Collectors.toList());
+
+ try (BatchTableCommit commit =
snapshotTable.newBatchWriteBuilder().newCommit()) {
+ commit.truncatePartitions(candidatePartitions);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ String.format(
+ "Failed to truncate partitions in snapshot table:
%s.",
+ candidatePartitions),
+ e);
+ }
+ }
+
+ @Override
+ public void retry(ManifestCommittable committable) {
+ // No-op. Truncating the same partitions again is safe, but we prefer
to only rely on the
+ // successful commit callback.
+ }
+
+ @Override
+ public void close() throws Exception {
+ // no resources to close
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
index 9148f26ec6..fd4cdff4d6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
@@ -204,4 +204,9 @@ public class ChainTableUtils {
}
return res;
}
+
+ public static boolean isScanFallbackDeltaBranch(CoreOptions options) {
+ return options.isChainTable()
+ &&
options.scanFallbackDeltaBranch().equalsIgnoreCase(options.branch());
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index aaaaec1854..583522822b 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -174,428 +174,4 @@ public class SparkCatalogWithHiveTest {
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
.master("local[2]");
}
-
- @Test
- public void testChainTable(@TempDir java.nio.file.Path tempDir) throws
IOException {
- Path warehousePath = new Path("file:" + tempDir.toString());
- SparkSession.Builder builder =
- SparkSession.builder()
- .config("spark.sql.warehouse.dir",
warehousePath.toString())
- // with hive metastore
- .config("spark.sql.catalogImplementation", "hive")
- .config("hive.metastore.uris", "thrift://localhost:" +
PORT)
- .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
- .config("spark.sql.catalog.spark_catalog.metastore",
"hive")
- .config(
-
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
- "thrift://localhost:" + PORT)
-
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
- .config(
- "spark.sql.catalog.spark_catalog.warehouse",
- warehousePath.toString())
- .config(
- "spark.sql.extensions",
-
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
- .master("local[2]");
- SparkSession spark = builder.getOrCreate();
- spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
- spark.sql("USE spark_catalog.my_db1");
-
- /** Create table */
- spark.sql(
- "CREATE TABLE IF NOT EXISTS \n"
- + " `my_db1`.`chain_test` (\n"
- + " `t1` BIGINT COMMENT 't1',\n"
- + " `t2` BIGINT COMMENT 't2',\n"
- + " `t3` STRING COMMENT 't3'\n"
- + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
- + "WITH\n"
- + " SERDEPROPERTIES ('serialization.format' = '1')
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat'
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES
(\n"
- + " 'bucket-key' = 't1',\n"
- + " 'primary-key' = 'dt,t1',\n"
- + " 'partition.timestamp-pattern' = '$dt',\n"
- + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n"
- + " 'chain-table.enabled' = 'true',\n"
- + " 'bucket' = '2',\n"
- + " 'merge-engine' = 'deduplicate', \n"
- + " 'sequence.field' = 't2'\n"
- + " )");
-
- /** Create branch */
- spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
- spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
-
- /** Set branch */
- spark.sql(
- "ALTER TABLE my_db1.chain_test SET tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot', "
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.sql(
- "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET
tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot',"
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.sql(
- "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET
tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot',"
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.close();
- spark = builder.getOrCreate();
-
- /** Write main branch */
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810') values (1, 1, '1'),(2, 1, '1');");
-
- /** Write delta branch */
- spark.sql("set spark.paimon.branch=delta;");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250809') values (1, 1, '1'),(2, 1, '1');");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810') values (1, 2, '1-1' ),(3, 1, '1' );");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811') values (2, 2, '1-1' ),(4, 1, '1' );");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250812') values (3, 2, '1-1' ),(4, 2, '1-1' );");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250813') values (5, 1, '1' ),(6, 1, '1' );");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250814') values (5, 2, '1-1' ),(6, 2, '1-1' );");
-
- /** Write snapshot branch */
- spark.sql("set spark.paimon.branch=snapshot;");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt
= '20250810') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250812') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250814') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'),
(5, 1, '1' ), (6, 1, '1');");
-
- spark.close();
- spark = builder.getOrCreate();
- /** Main read */
- assertThat(
- spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250810'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,1,1,20250810]",
"[2,1,1,20250810]");
-
- /** Snapshot read */
- assertThat(
- spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250814'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,2,1-1,20250814]",
- "[2,2,1-1,20250814]",
- "[3,2,1-1,20250814]",
- "[4,2,1-1,20250814]",
- "[5,1,1,20250814]",
- "[6,1,1,20250814]");
-
- /** Chain read */
- /** 1. non pre snapshot */
- assertThat(
- spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250809'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,1,1,20250809]",
"[2,1,1,20250809]");
- /** 2. has pre snapshot */
- assertThat(
- spark.sql("SELECT * FROM `my_db1`.`chain_test` where
dt = '20250811'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,2,1-1,20250811]",
- "[2,2,1-1,20250811]",
- "[3,1,1,20250811]",
- "[4,1,1,20250811]");
-
- /** Multi partition Read */
- assertThat(
- spark
- .sql(
- "SELECT * FROM `my_db1`.`chain_test`
where dt in ('20250810', '20250811', '20250812');")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,1,1,20250810]",
- "[2,1,1,20250810]",
- "[1,2,1-1,20250811]",
- "[2,2,1-1,20250811]",
- "[3,1,1,20250811]",
- "[4,1,1,20250811]",
- "[1,2,1-1,20250812]",
- "[2,2,1-1,20250812]",
- "[3,2,1-1,20250812]",
- "[4,2,1-1,20250812]");
-
- /** Incremental read */
- assertThat(
- spark
- .sql(
- "SELECT * FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250811'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[2,2,1-1,20250811]",
"[4,1,1,20250811]");
-
- /** Multi partition incremental read */
- assertThat(
- spark
- .sql(
- "SELECT * FROM
`my_db1`.`chain_test$branch_delta` where dt in ('20250810', '20250811',
'20250812');")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,2,1-1,20250810]",
- "[3,1,1,20250810]",
- "[2,2,1-1,20250811]",
- "[4,1,1,20250811]",
- "[3,2,1-1,20250812]",
- "[4,2,1-1,20250812]");
-
- /** Hybrid read */
- assertThat(
- spark
- .sql(
- "select * from `my_db1`.`chain_test`
where dt = '20250811'\n"
- + "union all\n"
- + "select * from
`my_db1`.`chain_test$branch_delta` where dt = '20250811'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,2,1-1,20250811]",
- "[2,2,1-1,20250811]",
- "[3,1,1,20250811]",
- "[4,1,1,20250811]",
- "[2,2,1-1,20250811]",
- "[4,1,1,20250811]");
-
- spark.close();
- spark = builder.getOrCreate();
-
- /** Drop table */
- spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
-
- spark.close();
- }
-
- @Test
- public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir)
throws IOException {
- Path warehousePath = new Path("file:" + tempDir.toString());
- SparkSession.Builder builder =
- SparkSession.builder()
- .config("spark.sql.warehouse.dir",
warehousePath.toString())
- // with hive metastore
- .config("spark.sql.catalogImplementation", "hive")
- .config("hive.metastore.uris", "thrift://localhost:" +
PORT)
- .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
- .config("spark.sql.catalog.spark_catalog.metastore",
"hive")
- .config(
-
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
- "thrift://localhost:" + PORT)
-
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
- .config(
- "spark.sql.catalog.spark_catalog.warehouse",
- warehousePath.toString())
- .config(
- "spark.sql.extensions",
-
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
- .master("local[2]");
- SparkSession spark = builder.getOrCreate();
- spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
- spark.sql("USE spark_catalog.my_db1");
-
- /** Create table */
- spark.sql(
- "CREATE TABLE IF NOT EXISTS \n"
- + " `my_db1`.`chain_test` (\n"
- + " `t1` BIGINT COMMENT 't1',\n"
- + " `t2` BIGINT COMMENT 't2',\n"
- + " `t3` STRING COMMENT 't3'\n"
- + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt',
`hour` STRING COMMENT 'hour') ROW FORMAT SERDE
'org.apache.paimon.hive.PaimonSerDe'\n"
- + "WITH\n"
- + " SERDEPROPERTIES ('serialization.format' = '1')
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat'
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES
(\n"
- + " 'bucket-key' = 't1',\n"
- + " 'primary-key' = 'dt,hour,t1',\n"
- + " 'partition.timestamp-pattern' = '$dt
$hour:00:00',\n"
- + " 'partition.timestamp-formatter' = 'yyyyMMdd
HH:mm:ss',\n"
- + " 'chain-table.enabled' = 'true',\n"
- + " 'bucket' = '2',\n"
- + " 'merge-engine' = 'deduplicate', \n"
- + " 'sequence.field' = 't2'\n"
- + " )");
-
- /** Create branch */
- spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
- spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
-
- /** Set branch */
- spark.sql(
- "ALTER TABLE my_db1.chain_test SET tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot', "
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.sql(
- "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET
tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot',"
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.sql(
- "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET
tblproperties ("
- + "'scan.fallback-snapshot-branch' = 'snapshot',"
- + "'scan.fallback-delta-branch' = 'delta')");
- spark.close();
- spark = builder.getOrCreate();
-
- /** Write main branch */
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810', hour = '22') values (1, 1, '1'),(2, 1, '1');");
-
- /** Write delta branch */
- spark.sql("set spark.paimon.branch=delta;");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810', hour = '21') values (1, 1, '1'),(2, 1, '1');");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810', hour = '22') values (1, 2, '1-1' ),(3, 1, '1' );");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810', hour = '23') values (2, 2, '1-1' ),(4, 1, '1' );");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '00') values (3, 2, '1-1' ),(4, 2, '1-1' );");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '01') values (5, 1, '1' ),(6, 1, '1' );");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '02') values (5, 2, '1-1' ),(6, 2, '1-1' );");
-
- /** Write snapshot branch */
- spark.sql("set spark.paimon.branch=snapshot;");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt
= '20250810', hour = '22') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4,
2, '1-1');");
- spark.sql(
- "insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4,
2, '1-1'), (5, 1, '1' ), (6, 1, '1');");
-
- spark.close();
- spark = builder.getOrCreate();
- /** Main read */
- assertThat(
- spark
- .sql(
- "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250810' and hour = '22'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,1,1,20250810,22]",
"[2,1,1,20250810,22]");
-
- /** Snapshot read */
- assertThat(
- spark
- .sql(
- "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250811' and hour = '02'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,2,1-1,20250811,02]",
- "[2,2,1-1,20250811,02]",
- "[3,2,1-1,20250811,02]",
- "[4,2,1-1,20250811,02]",
- "[5,1,1,20250811,02]",
- "[6,1,1,20250811,02]");
-
- /** Chain read */
- /** 1. non pre snapshot */
- assertThat(
- spark
- .sql(
- "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250810' and hour = '21'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,1,1,20250810,21]",
"[2,1,1,20250810,21]");
- /** 2. has pre snapshot */
- assertThat(
- spark
- .sql(
- "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250810' and hour = '23'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,2,1-1,20250810,23]",
- "[2,2,1-1,20250810,23]",
- "[3,1,1,20250810,23]",
- "[4,1,1,20250810,23]");
-
- /** Multi partition Read */
- assertThat(
- spark
- .sql(
- "SELECT * FROM `my_db1`.`chain_test`
where dt = '20250810' and hour in ('22', '23');")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,1,1,20250810,22]",
- "[2,1,1,20250810,22]",
- "[1,2,1-1,20250810,23]",
- "[2,2,1-1,20250810,23]",
- "[3,1,1,20250810,23]",
- "[4,1,1,20250810,23]");
-
- /** Incremental read */
- assertThat(
- spark
- .sql(
- "SELECT * FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[2,2,1-1,20250810,23]",
"[4,1,1,20250810,23]");
-
- /** Multi partition incremental read */
- assertThat(
- spark
- .sql(
- "SELECT * FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour in ('22',
'23');")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,2,1-1,20250810,22]",
- "[3,1,1,20250810,22]",
- "[2,2,1-1,20250810,23]",
- "[4,1,1,20250810,23]");
-
- /** Hybrid read */
- assertThat(
- spark
- .sql(
- "select * from `my_db1`.`chain_test`
where dt = '20250810' and hour = '23'\n"
- + "union all\n"
- + "select * from
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'")
- .collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder(
- "[1,2,1-1,20250810,23]",
- "[2,2,1-1,20250810,23]",
- "[3,1,1,20250810,23]",
- "[4,1,1,20250810,23]",
- "[2,2,1-1,20250810,23]",
- "[4,1,1,20250810,23]");
-
- spark.close();
- spark = builder.getOrCreate();
-
- /** Drop table */
- spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
-
- spark.close();
- }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
similarity index 79%
copy from
paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
copy to
paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index aaaaec1854..b5a28fe500 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -19,10 +19,9 @@
package org.apache.paimon.spark;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.hive.TestHiveMetastore;
-import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
@@ -30,20 +29,16 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Base tests for spark read. */
-public class SparkCatalogWithHiveTest {
+public class SparkChainTableITCase {
private static TestHiveMetastore testHiveMetastore;
- private static final int PORT = 9087;
- @TempDir java.nio.file.Path tempDir;
+ private static final int PORT = 9091;
@BeforeAll
public static void startMetastore() {
@@ -56,125 +51,6 @@ public class SparkCatalogWithHiveTest {
testHiveMetastore.stop();
}
- @Test
- public void testCreateFormatTable() throws IOException {
- SparkSession spark = createSessionBuilder().getOrCreate();
- {
- spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
- spark.sql("USE spark_catalog.my_db1");
-
- // test orc table
-
- spark.sql("CREATE TABLE IF NOT EXISTS table_orc (a INT, bb INT, c
STRING) USING orc");
-
- assertThat(
- spark.sql("SHOW TABLES").collectAsList().stream()
- .map(s -> s.get(1))
- .map(Object::toString))
- .containsExactlyInAnyOrder("table_orc");
-
- assertThat(
- spark.sql("EXPLAIN EXTENDED SELECT * from
table_orc").collectAsList()
- .stream()
- .map(s -> s.get(0))
- .map(Object::toString)
- .filter(s ->
s.contains("PaimonFormatTableScan"))
- .count())
- .isGreaterThan(0);
-
- // todo: There are some bugs with Spark CSV table's options. In
Spark 3.x, both reading
- // and
- // writing using the default delimiter value ',' even if we
specific it. In Spark 4.x,
- // reading is correct, but writing is still incorrect, just skip
setting it for now.
- // test csv table
-
- spark.sql(
- "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c
STRING) USING csv OPTIONS ('csv.field-delimiter' ',')");
- spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2,
'2')").collect();
- String r = spark.sql("DESCRIBE FORMATTED
table_csv").collectAsList().toString();
- assertThat(r).contains("sep=,");
- assertThat(
- spark.sql("SELECT * FROM
table_csv").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
-
- // test json table
-
- spark.sql(
- "CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c
STRING) USING json ");
- spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')");
- assertThat(
- spark.sql("SELECT * FROM
table_json").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
- }
- spark.stop();
- }
-
- @Test
- public void testSpecifyHiveConfDirInGenericCatalog() throws IOException {
- try (SparkSession spark =
- createSessionBuilder()
-
.config("spark.sql.catalog.spark_catalog.hive-conf-dir", "nonExistentPath")
- .config(
- "spark.sql.catalog.spark_catalog",
- SparkGenericCatalog.class.getName())
- .getOrCreate()) {
- assertThatThrownBy(() -> spark.sql("CREATE DATABASE my_db"))
- .rootCause()
- .isInstanceOf(FileNotFoundException.class)
- .hasMessageContaining("nonExistentPath");
- }
- }
-
- @Test
- public void testCreateExternalTable() throws IOException {
- try (SparkSession spark = createSessionBuilder().getOrCreate()) {
- String warehousePath =
spark.sparkContext().conf().get("spark.sql.warehouse.dir");
- spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
- spark.sql("USE spark_catalog.test_db");
-
- // create hive external table
- spark.sql("CREATE EXTERNAL TABLE external_table (a INT, bb INT, c
STRING)");
-
- // drop hive external table
- spark.sql("DROP TABLE external_table");
-
- // file system table exists
- assertThatCode(
- () ->
- FileStoreTableFactory.create(
- LocalFileIO.create(),
- new Path(
- warehousePath,
- String.format(
- "%s.db/%s",
- "test_db",
"external_table"))))
- .doesNotThrowAnyException();
- }
- }
-
- private SparkSession.Builder createSessionBuilder() {
- Path warehousePath = new Path("file:" + tempDir.toString());
- return SparkSession.builder()
- .config("spark.sql.warehouse.dir", warehousePath.toString())
- // with hive metastore
- .config("spark.sql.catalogImplementation", "hive")
- .config("hive.metastore.uris", "thrift://localhost:" + PORT)
- .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
- .config("spark.sql.catalog.spark_catalog.metastore", "hive")
- .config(
- "spark.sql.catalog.spark_catalog.hive.metastore.uris",
- "thrift://localhost:" + PORT)
- .config("spark.sql.catalog.spark_catalog.warehouse",
warehousePath.toString())
- .config(
- "spark.sql.extensions",
-
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
- .master("local[2]");
- }
-
@Test
public void testChainTable(@TempDir java.nio.file.Path tempDir) throws
IOException {
Path warehousePath = new Path("file:" + tempDir.toString());
@@ -379,7 +255,23 @@ public class SparkCatalogWithHiveTest {
spark.close();
spark = builder.getOrCreate();
+ spark.sql("set spark.paimon.branch=delta;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` values (5, 2,
'1', '20250813'),(6, 2, '1', '20250814');");
+
+ spark.close();
+ spark = builder.getOrCreate();
+ Dataset<Row> df =
+ spark.sql(
+ "SELECT t1,t2,t3 FROM
`my_db1`.`chain_test$branch_snapshot` where dt = '20250814'");
+ assertThat(df.count()).isEqualTo(0);
+ df =
+ spark.sql(
+ "SELECT t1,t2,t3 FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250814'");
+ assertThat(df.count()).isEqualTo(1);
+ spark.close();
+ spark = builder.getOrCreate();
/** Drop table */
spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
@@ -592,7 +484,23 @@ public class SparkCatalogWithHiveTest {
spark.close();
spark = builder.getOrCreate();
+ spark.sql("set spark.paimon.branch=delta;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test` values (6, 2,
'1', '20250811', '02');");
+
+ spark.close();
+ spark = builder.getOrCreate();
+ Dataset<Row> df =
+ spark.sql(
+ "SELECT t1,t2,t3 FROM
`my_db1`.`chain_test$branch_snapshot` where dt = '20250811' and hour = '02'");
+ assertThat(df.count()).isEqualTo(0);
+ df =
+ spark.sql(
+ "SELECT t1,t2,t3 FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250811' and hour = '02'");
+ assertThat(df.count()).isEqualTo(1);
+ spark.close();
+ spark = builder.getOrCreate();
/** Drop table */
spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");