This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 84a552b5eb [core] Support re-overwrite by spark for chain table (#7038)
84a552b5eb is described below

commit 84a552b5eb41584cd479ca99b6c4b6a7794c3d9a
Author: Stefanietry <[email protected]>
AuthorDate: Mon Jan 19 16:35:24 2026 +0800

    [core] Support re-overwrite by spark for chain table (#7038)
---
 docs/static/img/chain-table.png                    | Bin 172663 -> 172923 bytes
 .../java/org/apache/paimon/AbstractFileStore.java  |   5 +
 .../ChainTableOverwriteCommitCallback.java         | 135 +++++++
 .../org/apache/paimon/utils/ChainTableUtils.java   |   5 +
 .../paimon/spark/SparkCatalogWithHiveTest.java     | 424 ---------------------
 ...ithHiveTest.java => SparkChainTableITCase.java} | 162 ++------
 6 files changed, 180 insertions(+), 551 deletions(-)

diff --git a/docs/static/img/chain-table.png b/docs/static/img/chain-table.png
index 19fbc06685..c55e9e5e34 100644
Binary files a/docs/static/img/chain-table.png and 
b/docs/static/img/chain-table.png differ
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 4f8bfb68e9..edf7e16294 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -36,6 +36,7 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.metastore.AddPartitionCommitCallback;
 import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
 import org.apache.paimon.metastore.TagPreviewCommitCallback;
 import org.apache.paimon.operation.ChangelogDeletion;
 import org.apache.paimon.operation.FileStoreCommitImpl;
@@ -418,6 +419,10 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
             callbacks.add(new IcebergCommitCallback(table, commitUser));
         }
 
+        if (options.isChainTable()) {
+            callbacks.add(new ChainTableOverwriteCommitCallback(table));
+        }
+
         callbacks.addAll(CallbackUtils.loadCommitCallbacks(options, table));
         return callbacks;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
new file mode 100644
index 0000000000..76defb1ed0
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.table.ChainGroupReadTable;
+import org.apache.paimon.table.FallbackReadFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A {@link CommitCallback} implementation to maintain chain table snapshot 
branch for overwrite
+ * commits.
+ *
+ * <p>When the following conditions are met, this callback will truncate the 
corresponding
+ * partitions on the snapshot branch:
+ *
+ * <ul>
+ *   <li>The committed snapshot kind is {@link CommitKind#OVERWRITE};
+ *   <li>The table is a chain table and current branch is delta branch.
+ * </ul>
+ *
+ * <p>This callback is designed to be idempotent. It may be invoked multiple 
times for the same
+ * logical commit, but truncating the same partitions on the snapshot branch 
repeatedly is safe.
+ */
+public class ChainTableOverwriteCommitCallback implements CommitCallback {
+
+    private transient FileStoreTable table;
+    private transient CoreOptions coreOptions;
+
+    public ChainTableOverwriteCommitCallback(FileStoreTable table) {
+        this.table = table;
+        this.coreOptions = table.coreOptions();
+    }
+
+    @Override
+    public void call(
+            List<SimpleFileEntry> baseFiles,
+            List<ManifestEntry> deltaFiles,
+            List<IndexManifestEntry> indexFiles,
+            Snapshot snapshot) {
+
+        if (!ChainTableUtils.isScanFallbackDeltaBranch(coreOptions)) {
+            return;
+        }
+
+        if (snapshot.commitKind() != CommitKind.OVERWRITE) {
+            return;
+        }
+
+        // Find the underlying table for writing snapshot branch.
+        FileStoreTable candidateTable = table;
+        if (table instanceof FallbackReadFileStoreTable) {
+            candidateTable =
+                    ((ChainGroupReadTable) ((FallbackReadFileStoreTable) 
table).fallback())
+                            .wrapped();
+        }
+
+        FileStoreTable snapshotTable =
+                
candidateTable.switchToBranch(coreOptions.scanFallbackSnapshotBranch());
+
+        InternalRowPartitionComputer partitionComputer =
+                new InternalRowPartitionComputer(
+                        coreOptions.partitionDefaultName(),
+                        table.schema().logicalPartitionType(),
+                        table.schema().partitionKeys().toArray(new String[0]),
+                        coreOptions.legacyPartitionName());
+
+        List<BinaryRow> overwritePartitions =
+                deltaFiles.stream()
+                        .map(ManifestEntry::partition)
+                        .distinct()
+                        .collect(Collectors.toList());
+
+        if (overwritePartitions.isEmpty()) {
+            return;
+        }
+
+        List<Map<String, String>> candidatePartitions =
+                overwritePartitions.stream()
+                        .map(partitionComputer::generatePartValues)
+                        .collect(Collectors.toList());
+
+        try (BatchTableCommit commit = 
snapshotTable.newBatchWriteBuilder().newCommit()) {
+            commit.truncatePartitions(candidatePartitions);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    String.format(
+                            "Failed to truncate partitions in snapshot table: 
%s.",
+                            candidatePartitions),
+                    e);
+        }
+    }
+
+    @Override
+    public void retry(ManifestCommittable committable) {
+        // No-op. Truncating the same partitions again is safe, but we prefer 
to only rely on the
+        // successful commit callback.
+    }
+
+    @Override
+    public void close() throws Exception {
+        // no resources to close
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
index 9148f26ec6..fd4cdff4d6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
@@ -204,4 +204,9 @@ public class ChainTableUtils {
         }
         return res;
     }
+
+    public static boolean isScanFallbackDeltaBranch(CoreOptions options) {
+        return options.isChainTable()
+                && 
options.scanFallbackDeltaBranch().equalsIgnoreCase(options.branch());
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index aaaaec1854..583522822b 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -174,428 +174,4 @@ public class SparkCatalogWithHiveTest {
                         
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
                 .master("local[2]");
     }
-
-    @Test
-    public void testChainTable(@TempDir java.nio.file.Path tempDir) throws 
IOException {
-        Path warehousePath = new Path("file:" + tempDir.toString());
-        SparkSession.Builder builder =
-                SparkSession.builder()
-                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
-                        // with hive metastore
-                        .config("spark.sql.catalogImplementation", "hive")
-                        .config("hive.metastore.uris", "thrift://localhost:" + 
PORT)
-                        .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
-                        .config("spark.sql.catalog.spark_catalog.metastore", 
"hive")
-                        .config(
-                                
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
-                                "thrift://localhost:" + PORT)
-                        
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
-                        .config(
-                                "spark.sql.catalog.spark_catalog.warehouse",
-                                warehousePath.toString())
-                        .config(
-                                "spark.sql.extensions",
-                                
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
-                        .master("local[2]");
-        SparkSession spark = builder.getOrCreate();
-        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
-        spark.sql("USE spark_catalog.my_db1");
-
-        /** Create table */
-        spark.sql(
-                "CREATE TABLE IF NOT EXISTS \n"
-                        + "  `my_db1`.`chain_test` (\n"
-                        + "    `t1` BIGINT COMMENT 't1',\n"
-                        + "    `t2` BIGINT COMMENT 't2',\n"
-                        + "    `t3` STRING COMMENT 't3'\n"
-                        + "  ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW 
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
-                        + "WITH\n"
-                        + "  SERDEPROPERTIES ('serialization.format' = '1') 
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' 
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES 
(\n"
-                        + "    'bucket-key' = 't1',\n"
-                        + "    'primary-key' = 'dt,t1',\n"
-                        + "    'partition.timestamp-pattern' = '$dt',\n"
-                        + "    'partition.timestamp-formatter' = 'yyyyMMdd',\n"
-                        + "    'chain-table.enabled' = 'true',\n"
-                        + "    'bucket' = '2',\n"
-                        + "    'merge-engine' = 'deduplicate', \n"
-                        + "    'sequence.field' = 't2'\n"
-                        + "  )");
-
-        /** Create branch */
-        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
-        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
-
-        /** Set branch */
-        spark.sql(
-                "ALTER TABLE my_db1.chain_test SET tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot', "
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.sql(
-                "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET 
tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.sql(
-                "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET 
tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.close();
-        spark = builder.getOrCreate();
-
-        /** Write main branch */
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810') values (1, 1, '1'),(2, 1, '1');");
-
-        /** Write delta branch */
-        spark.sql("set spark.paimon.branch=delta;");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250809') values (1, 1, '1'),(2, 1, '1');");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810') values (1, 2, '1-1' ),(3, 1, '1' );");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811') values (2, 2, '1-1' ),(4, 1, '1' );");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250812') values (3, 2, '1-1' ),(4, 2, '1-1' );");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250813') values (5, 1, '1' ),(6, 1, '1' );");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250814') values (5, 2, '1-1' ),(6, 2, '1-1' );");
-
-        /** Write snapshot branch */
-        spark.sql("set spark.paimon.branch=snapshot;");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test`  partition (dt 
= '20250810')  values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250812') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250814') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), 
(5, 1, '1' ), (6, 1, '1');");
-
-        spark.close();
-        spark = builder.getOrCreate();
-        /** Main read */
-        assertThat(
-                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250810'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[1,1,1,20250810]", 
"[2,1,1,20250810]");
-
-        /** Snapshot read */
-        assertThat(
-                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250814'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,2,1-1,20250814]",
-                        "[2,2,1-1,20250814]",
-                        "[3,2,1-1,20250814]",
-                        "[4,2,1-1,20250814]",
-                        "[5,1,1,20250814]",
-                        "[6,1,1,20250814]");
-
-        /** Chain read */
-        /** 1. non pre snapshot */
-        assertThat(
-                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250809'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[1,1,1,20250809]", 
"[2,1,1,20250809]");
-        /** 2. has pre snapshot */
-        assertThat(
-                        spark.sql("SELECT * FROM `my_db1`.`chain_test` where 
dt = '20250811'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,2,1-1,20250811]",
-                        "[2,2,1-1,20250811]",
-                        "[3,1,1,20250811]",
-                        "[4,1,1,20250811]");
-
-        /** Multi partition Read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt in ('20250810', '20250811', '20250812');")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,1,1,20250810]",
-                        "[2,1,1,20250810]",
-                        "[1,2,1-1,20250811]",
-                        "[2,2,1-1,20250811]",
-                        "[3,1,1,20250811]",
-                        "[4,1,1,20250811]",
-                        "[1,2,1-1,20250812]",
-                        "[2,2,1-1,20250812]",
-                        "[3,2,1-1,20250812]",
-                        "[4,2,1-1,20250812]");
-
-        /** Incremental read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250811'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[2,2,1-1,20250811]", 
"[4,1,1,20250811]");
-
-        /** Multi partition incremental read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM 
`my_db1`.`chain_test$branch_delta` where dt in ('20250810', '20250811', 
'20250812');")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,2,1-1,20250810]",
-                        "[3,1,1,20250810]",
-                        "[2,2,1-1,20250811]",
-                        "[4,1,1,20250811]",
-                        "[3,2,1-1,20250812]",
-                        "[4,2,1-1,20250812]");
-
-        /** Hybrid read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "select * from  `my_db1`.`chain_test` 
where dt = '20250811'\n"
-                                                + "union all\n"
-                                                + "select * from  
`my_db1`.`chain_test$branch_delta`  where dt = '20250811'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,2,1-1,20250811]",
-                        "[2,2,1-1,20250811]",
-                        "[3,1,1,20250811]",
-                        "[4,1,1,20250811]",
-                        "[2,2,1-1,20250811]",
-                        "[4,1,1,20250811]");
-
-        spark.close();
-        spark = builder.getOrCreate();
-
-        /** Drop table */
-        spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
-
-        spark.close();
-    }
-
-    @Test
-    public void testHourlyChainTable(@TempDir java.nio.file.Path tempDir) 
throws IOException {
-        Path warehousePath = new Path("file:" + tempDir.toString());
-        SparkSession.Builder builder =
-                SparkSession.builder()
-                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
-                        // with hive metastore
-                        .config("spark.sql.catalogImplementation", "hive")
-                        .config("hive.metastore.uris", "thrift://localhost:" + 
PORT)
-                        .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
-                        .config("spark.sql.catalog.spark_catalog.metastore", 
"hive")
-                        .config(
-                                
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
-                                "thrift://localhost:" + PORT)
-                        
.config("spark.sql.catalog.spark_catalog.format-table.enabled", "true")
-                        .config(
-                                "spark.sql.catalog.spark_catalog.warehouse",
-                                warehousePath.toString())
-                        .config(
-                                "spark.sql.extensions",
-                                
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
-                        .master("local[2]");
-        SparkSession spark = builder.getOrCreate();
-        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
-        spark.sql("USE spark_catalog.my_db1");
-
-        /** Create table */
-        spark.sql(
-                "CREATE TABLE IF NOT EXISTS \n"
-                        + "  `my_db1`.`chain_test` (\n"
-                        + "    `t1` BIGINT COMMENT 't1',\n"
-                        + "    `t2` BIGINT COMMENT 't2',\n"
-                        + "    `t3` STRING COMMENT 't3'\n"
-                        + "  ) PARTITIONED BY (`dt` STRING COMMENT 'dt', 
`hour` STRING COMMENT 'hour') ROW FORMAT SERDE 
'org.apache.paimon.hive.PaimonSerDe'\n"
-                        + "WITH\n"
-                        + "  SERDEPROPERTIES ('serialization.format' = '1') 
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' 
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES 
(\n"
-                        + "    'bucket-key' = 't1',\n"
-                        + "    'primary-key' = 'dt,hour,t1',\n"
-                        + "    'partition.timestamp-pattern' = '$dt 
$hour:00:00',\n"
-                        + "    'partition.timestamp-formatter' = 'yyyyMMdd 
HH:mm:ss',\n"
-                        + "    'chain-table.enabled' = 'true',\n"
-                        + "    'bucket' = '2',\n"
-                        + "    'merge-engine' = 'deduplicate', \n"
-                        + "    'sequence.field' = 't2'\n"
-                        + "  )");
-
-        /** Create branch */
-        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'snapshot');");
-        spark.sql("CALL sys.create_branch('my_db1.chain_test', 'delta')");
-
-        /** Set branch */
-        spark.sql(
-                "ALTER TABLE my_db1.chain_test SET tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot', "
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.sql(
-                "ALTER TABLE `my_db1`.`chain_test$branch_snapshot` SET 
tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.sql(
-                "ALTER TABLE `my_db1`.`chain_test$branch_delta` SET 
tblproperties ("
-                        + "'scan.fallback-snapshot-branch' = 'snapshot',"
-                        + "'scan.fallback-delta-branch' = 'delta')");
-        spark.close();
-        spark = builder.getOrCreate();
-
-        /** Write main branch */
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810', hour = '22') values (1, 1, '1'),(2, 1, '1');");
-
-        /** Write delta branch */
-        spark.sql("set spark.paimon.branch=delta;");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810', hour = '21') values (1, 1, '1'),(2, 1, '1');");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810', hour = '22') values (1, 2, '1-1' ),(3, 1, '1' );");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810', hour = '23') values (2, 2, '1-1' ),(4, 1, '1' );");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '00') values (3, 2, '1-1' ),(4, 2, '1-1' );");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '01') values (5, 1, '1' ),(6, 1, '1' );");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '02') values (5, 2, '1-1' ),(6, 2, '1-1' );");
-
-        /** Write snapshot branch */
-        spark.sql("set spark.paimon.branch=snapshot;");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test`  partition (dt 
= '20250810', hour = '22')  values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 
2, '1-1');");
-        spark.sql(
-                "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 
2, '1-1'), (5, 1, '1' ), (6, 1, '1');");
-
-        spark.close();
-        spark = builder.getOrCreate();
-        /** Main read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250810' and hour = '22'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[1,1,1,20250810,22]", 
"[2,1,1,20250810,22]");
-
-        /** Snapshot read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250811' and hour = '02'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,2,1-1,20250811,02]",
-                        "[2,2,1-1,20250811,02]",
-                        "[3,2,1-1,20250811,02]",
-                        "[4,2,1-1,20250811,02]",
-                        "[5,1,1,20250811,02]",
-                        "[6,1,1,20250811,02]");
-
-        /** Chain read */
-        /** 1. non pre snapshot */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250810' and hour = '21'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[1,1,1,20250810,21]", 
"[2,1,1,20250810,21]");
-        /** 2. has pre snapshot */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250810' and  hour = '23'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,2,1-1,20250810,23]",
-                        "[2,2,1-1,20250810,23]",
-                        "[3,1,1,20250810,23]",
-                        "[4,1,1,20250810,23]");
-
-        /** Multi partition Read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM `my_db1`.`chain_test` 
where dt = '20250810' and hour in ('22', '23');")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,1,1,20250810,22]",
-                        "[2,1,1,20250810,22]",
-                        "[1,2,1-1,20250810,23]",
-                        "[2,2,1-1,20250810,23]",
-                        "[3,1,1,20250810,23]",
-                        "[4,1,1,20250810,23]");
-
-        /** Incremental read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour = '23'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[2,2,1-1,20250810,23]", 
"[4,1,1,20250810,23]");
-
-        /** Multi partition incremental read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "SELECT * FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250810' and hour in ('22', 
'23');")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,2,1-1,20250810,22]",
-                        "[3,1,1,20250810,22]",
-                        "[2,2,1-1,20250810,23]",
-                        "[4,1,1,20250810,23]");
-
-        /** Hybrid read */
-        assertThat(
-                        spark
-                                .sql(
-                                        "select * from  `my_db1`.`chain_test` 
where dt = '20250810' and hour = '23'\n"
-                                                + "union all\n"
-                                                + "select * from  
`my_db1`.`chain_test$branch_delta`  where dt = '20250810' and hour = '23'")
-                                .collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder(
-                        "[1,2,1-1,20250810,23]",
-                        "[2,2,1-1,20250810,23]",
-                        "[3,1,1,20250810,23]",
-                        "[4,1,1,20250810,23]",
-                        "[2,2,1-1,20250810,23]",
-                        "[4,1,1,20250810,23]");
-
-        spark.close();
-        spark = builder.getOrCreate();
-
-        /** Drop table */
-        spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
-
-        spark.close();
-    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
similarity index 79%
copy from 
paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
copy to 
paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index aaaaec1854..b5a28fe500 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -19,10 +19,9 @@
 package org.apache.paimon.spark;
 
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.hive.TestHiveMetastore;
-import org.apache.paimon.table.FileStoreTableFactory;
 
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterAll;
@@ -30,20 +29,16 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Base tests for spark read. */
-public class SparkCatalogWithHiveTest {
+public class SparkChainTableITCase {
 
     private static TestHiveMetastore testHiveMetastore;
-    private static final int PORT = 9087;
-    @TempDir java.nio.file.Path tempDir;
+    private static final int PORT = 9091;
 
     @BeforeAll
     public static void startMetastore() {
@@ -56,125 +51,6 @@ public class SparkCatalogWithHiveTest {
         testHiveMetastore.stop();
     }
 
-    @Test
-    public void testCreateFormatTable() throws IOException {
-        SparkSession spark = createSessionBuilder().getOrCreate();
-        {
-            spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
-            spark.sql("USE spark_catalog.my_db1");
-
-            // test orc table
-
-            spark.sql("CREATE TABLE IF NOT EXISTS table_orc (a INT, bb INT, c 
STRING) USING orc");
-
-            assertThat(
-                            spark.sql("SHOW TABLES").collectAsList().stream()
-                                    .map(s -> s.get(1))
-                                    .map(Object::toString))
-                    .containsExactlyInAnyOrder("table_orc");
-
-            assertThat(
-                            spark.sql("EXPLAIN EXTENDED SELECT * from 
table_orc").collectAsList()
-                                    .stream()
-                                    .map(s -> s.get(0))
-                                    .map(Object::toString)
-                                    .filter(s -> 
s.contains("PaimonFormatTableScan"))
-                                    .count())
-                    .isGreaterThan(0);
-
-            // todo: There are some bugs with Spark CSV table's options. In 
Spark 3.x, both reading
-            // and
-            // writing using the default delimiter value ',' even if we 
specific it. In Spark 4.x,
-            // reading is correct, but writing is still incorrect, just skip 
setting it for now.
-            // test csv table
-
-            spark.sql(
-                    "CREATE TABLE IF NOT EXISTS table_csv (a INT, bb INT, c 
STRING) USING csv OPTIONS ('csv.field-delimiter' ',')");
-            spark.sql("INSERT INTO table_csv VALUES (1, 1, '1'), (2, 2, 
'2')").collect();
-            String r = spark.sql("DESCRIBE FORMATTED 
table_csv").collectAsList().toString();
-            assertThat(r).contains("sep=,");
-            assertThat(
-                            spark.sql("SELECT * FROM 
table_csv").collectAsList().stream()
-                                    .map(Row::toString)
-                                    .collect(Collectors.toList()))
-                    .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
-
-            // test json table
-
-            spark.sql(
-                    "CREATE TABLE IF NOT EXISTS table_json (a INT, bb INT, c 
STRING) USING json ");
-            spark.sql("INSERT INTO table_json VALUES(1, 1, '1'), (2, 2, '2')");
-            assertThat(
-                            spark.sql("SELECT * FROM 
table_json").collectAsList().stream()
-                                    .map(Row::toString)
-                                    .collect(Collectors.toList()))
-                    .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
-        }
-        spark.stop();
-    }
-
-    @Test
-    public void testSpecifyHiveConfDirInGenericCatalog() throws IOException {
-        try (SparkSession spark =
-                createSessionBuilder()
-                        
.config("spark.sql.catalog.spark_catalog.hive-conf-dir", "nonExistentPath")
-                        .config(
-                                "spark.sql.catalog.spark_catalog",
-                                SparkGenericCatalog.class.getName())
-                        .getOrCreate()) {
-            assertThatThrownBy(() -> spark.sql("CREATE DATABASE my_db"))
-                    .rootCause()
-                    .isInstanceOf(FileNotFoundException.class)
-                    .hasMessageContaining("nonExistentPath");
-        }
-    }
-
-    @Test
-    public void testCreateExternalTable() throws IOException {
-        try (SparkSession spark = createSessionBuilder().getOrCreate()) {
-            String warehousePath = 
spark.sparkContext().conf().get("spark.sql.warehouse.dir");
-            spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
-            spark.sql("USE spark_catalog.test_db");
-
-            // create hive external table
-            spark.sql("CREATE EXTERNAL TABLE external_table (a INT, bb INT, c 
STRING)");
-
-            // drop hive external table
-            spark.sql("DROP TABLE external_table");
-
-            // file system table exists
-            assertThatCode(
-                            () ->
-                                    FileStoreTableFactory.create(
-                                            LocalFileIO.create(),
-                                            new Path(
-                                                    warehousePath,
-                                                    String.format(
-                                                            "%s.db/%s",
-                                                            "test_db", 
"external_table"))))
-                    .doesNotThrowAnyException();
-        }
-    }
-
-    private SparkSession.Builder createSessionBuilder() {
-        Path warehousePath = new Path("file:" + tempDir.toString());
-        return SparkSession.builder()
-                .config("spark.sql.warehouse.dir", warehousePath.toString())
-                // with hive metastore
-                .config("spark.sql.catalogImplementation", "hive")
-                .config("hive.metastore.uris", "thrift://localhost:" + PORT)
-                .config("spark.sql.catalog.spark_catalog", 
SparkCatalog.class.getName())
-                .config("spark.sql.catalog.spark_catalog.metastore", "hive")
-                .config(
-                        "spark.sql.catalog.spark_catalog.hive.metastore.uris",
-                        "thrift://localhost:" + PORT)
-                .config("spark.sql.catalog.spark_catalog.warehouse", 
warehousePath.toString())
-                .config(
-                        "spark.sql.extensions",
-                        
"org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
-                .master("local[2]");
-    }
-
     @Test
     public void testChainTable(@TempDir java.nio.file.Path tempDir) throws 
IOException {
         Path warehousePath = new Path("file:" + tempDir.toString());
@@ -379,7 +255,23 @@ public class SparkCatalogWithHiveTest {
 
         spark.close();
         spark = builder.getOrCreate();
+        spark.sql("set spark.paimon.branch=delta;");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` values (5, 2, 
'1', '20250813'),(6, 2, '1', '20250814');");
+
+        spark.close();
+        spark = builder.getOrCreate();
+        Dataset<Row> df =
+                spark.sql(
+                        "SELECT t1,t2,t3 FROM 
`my_db1`.`chain_test$branch_snapshot` where dt = '20250814'");
+        assertThat(df.count()).isEqualTo(0);
+        df =
+                spark.sql(
+                        "SELECT t1,t2,t3 FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250814'");
+        assertThat(df.count()).isEqualTo(1);
 
+        spark.close();
+        spark = builder.getOrCreate();
         /** Drop table */
         spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
 
@@ -592,7 +484,23 @@ public class SparkCatalogWithHiveTest {
 
         spark.close();
         spark = builder.getOrCreate();
+        spark.sql("set spark.paimon.branch=delta;");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test` values (6, 2, 
'1', '20250811', '02');");
+
+        spark.close();
+        spark = builder.getOrCreate();
+        Dataset<Row> df =
+                spark.sql(
+                        "SELECT t1,t2,t3 FROM 
`my_db1`.`chain_test$branch_snapshot` where dt = '20250811' and hour = '02'");
+        assertThat(df.count()).isEqualTo(0);
+        df =
+                spark.sql(
+                        "SELECT t1,t2,t3 FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250811' and hour = '02'");
+        assertThat(df.count()).isEqualTo(1);
 
+        spark.close();
+        spark = builder.getOrCreate();
         /** Drop table */
         spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
 


Reply via email to