This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 974f577cd1 [core] Add pre check when drop partition for chain table 
(#7109)
974f577cd1 is described below

commit 974f577cd1dfc721f27359c86f227903acd38e98
Author: Stefanietry <[email protected]>
AuthorDate: Thu Feb 5 13:40:30 2026 +0800

    [core] Add pre check when drop partition for chain table (#7109)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  14 +-
 .../metastore/ChainTableCommitPreCallback.java     | 207 +++++++++++++++++++++
 .../ChainTableOverwriteCommitCallback.java         |  10 +-
 .../paimon/operation/FileStoreCommitImpl.java      |  15 +-
 .../paimon/table/sink/CommitPreCallback.java       |  51 +++++
 .../org/apache/paimon/utils/ChainTableUtils.java   |  16 ++
 .../apache/paimon/spark/SparkChainTableITCase.java |  94 +++++++++-
 7 files changed, 385 insertions(+), 22 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 70108a1920..a92bde3d64 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -36,6 +36,7 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.metastore.AddPartitionCommitCallback;
 import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.ChainTableCommitPreCallback;
 import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
 import org.apache.paimon.metastore.TagPreviewCommitCallback;
 import org.apache.paimon.operation.ChangelogDeletion;
@@ -59,6 +60,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.PartitionHandler;
 import org.apache.paimon.table.sink.CallbackUtils;
 import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.table.sink.CommitPreCallback;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.SuccessFileTagCallback;
 import org.apache.paimon.tag.TagAutoManager;
@@ -333,7 +335,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.commitDiscardDuplicateFiles(),
                 conflictDetectFactory,
                 strictModeChecker,
-                rollback);
+                rollback,
+                createCommitPreCallbacks(commitUser, table));
     }
 
     @Override
@@ -431,6 +434,15 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
         return callbacks;
     }
 
+    private List<CommitPreCallback> createCommitPreCallbacks(
+            String commitUser, FileStoreTable table) {
+        List<CommitPreCallback> callbacks = new ArrayList<>();
+        if (options.isChainTable()) {
+            callbacks.add(new ChainTableCommitPreCallback(table));
+        }
+        return callbacks;
+    }
+
     @Override
     @Nullable
     public PartitionExpire newPartitionExpire(String commitUser, 
FileStoreTable table) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
new file mode 100644
index 0000000000..0b26cb6375
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.operation.commit.ManifestEntryChanges;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitPreCallback;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * {@link CommitPreCallback} implementation for chain tables.
+ *
+ * <p>This callback performs a pre-check before dropping partitions on the 
snapshot branch of a
+ * chain table. It verifies that a snapshot partition being dropped is either 
followed by no delta
+ * partitions in the chain interval or has a previous snapshot partition that 
can serve as its
+ * predecessor.
+ *
+ * <p>The callback is only executed when all of following conditions are met:
+ *
+ * <ul>
+ *   <li>The table is configured as a chain table and the current branch is 
the snapshot branch (see
+ *       {@link ChainTableUtils#isScanFallbackSnapshotBranch(CoreOptions)}).
+ *   <li>The committed snapshot kind is {@link CommitKind#OVERWRITE}.
+ *   <li>All table and index manifest entries in the commit are {@link 
FileKind#DELETE deletes}.
+ * </ul>
+ *
+ * <p>If the validation fails for any of the affected partitions, a {@link 
RuntimeException} is
+ * thrown and the commit is aborted.
+ *
+ * <p>This implementation keeps only references to the table and its options 
and does not maintain
+ * mutable state between invocations.
+ */
+public class ChainTableCommitPreCallback implements CommitPreCallback {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChainTableCommitPreCallback.class);
+
+    private transient FileStoreTable table;
+    private transient CoreOptions coreOptions;
+
+    public ChainTableCommitPreCallback(FileStoreTable table) {
+        this.table = table;
+        this.coreOptions = table.coreOptions();
+    }
+
+    @Override
+    public void call(
+            List<SimpleFileEntry> baseFiles,
+            List<ManifestEntry> deltaFiles,
+            List<IndexManifestEntry> indexFiles,
+            Snapshot snapshot) {
+        if (!ChainTableUtils.isScanFallbackSnapshotBranch(coreOptions)) {
+            return;
+        }
+        if (snapshot.commitKind() != CommitKind.OVERWRITE) {
+            return;
+        }
+        if (!isPureDeleteCommit(deltaFiles, indexFiles)) {
+            return;
+        }
+        List<BinaryRow> changedPartitions =
+                ManifestEntryChanges.changedPartitions(deltaFiles, indexFiles);
+        FileStoreTable candidateTable = 
ChainTableUtils.resolveChainPrimaryTable(table);
+        FileStoreTable deltaTable =
+                
candidateTable.switchToBranch(coreOptions.scanFallbackDeltaBranch());
+        RowType partitionType = table.schema().logicalPartitionType();
+        RowDataToObjectArrayConverter partitionConverter =
+                new RowDataToObjectArrayConverter(partitionType);
+        InternalRowPartitionComputer partitionComputer =
+                new InternalRowPartitionComputer(
+                        coreOptions.partitionDefaultName(),
+                        partitionType,
+                        table.schema().partitionKeys().toArray(new String[0]),
+                        coreOptions.legacyPartitionName());
+        RecordComparator partitionComparator =
+                
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
+        List<BinaryRow> snapshotPartitions =
+                table.newSnapshotReader().partitionEntries().stream()
+                        .map(PartitionEntry::partition)
+                        .sorted(partitionComparator)
+                        .collect(Collectors.toList());
+        SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader();
+        PredicateBuilder builder = new PredicateBuilder(partitionType);
+        for (BinaryRow partition : changedPartitions) {
+            Optional<BinaryRow> preSnapshotPartition =
+                    findPreSnapshotPartition(snapshotPartitions, partition, 
partitionComparator);
+            Optional<BinaryRow> nextSnapshotPartition =
+                    findNextSnapshotPartition(snapshotPartitions, partition, 
partitionComparator);
+            Predicate deltaFollowingPredicate =
+                    ChainTableUtils.createTriangularPredicate(
+                            partition, partitionConverter, builder::equal, 
builder::greaterThan);
+            List<BinaryRow> deltaFollowingPartitions =
+                    
deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate)
+                            .partitionEntries().stream()
+                            .map(PartitionEntry::partition)
+                            .filter(
+                                    deltaPartition ->
+                                            isBeforeNextSnapshotPartition(
+                                                    deltaPartition,
+                                                    nextSnapshotPartition,
+                                                    partitionComparator))
+                            .collect(Collectors.toList());
+            boolean canDrop =
+                    deltaFollowingPartitions.isEmpty() || 
preSnapshotPartition.isPresent();
+            LOG.info(
+                    "Drop partition, partition={}, canDrop={}, 
preSnapshotPartition={}, nextSnapshotPartition={}",
+                    partitionComputer.generatePartValues(partition),
+                    canDrop,
+                    generatePartitionValues(preSnapshotPartition, 
partitionComputer),
+                    generatePartitionValues(nextSnapshotPartition, 
partitionComputer));
+            if (!canDrop) {
+                throw new RuntimeException("Snapshot partition cannot be 
dropped.");
+            }
+        }
+    }
+
+    private boolean isPureDeleteCommit(
+            List<ManifestEntry> deltaFiles, List<IndexManifestEntry> 
indexFiles) {
+        return deltaFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE)
+                && indexFiles.stream().allMatch(f -> f.kind() == 
FileKind.DELETE);
+    }
+
+    private Optional<BinaryRow> findPreSnapshotPartition(
+            List<BinaryRow> snapshotPartitions,
+            BinaryRow partition,
+            RecordComparator partitionComparator) {
+        BinaryRow pre = null;
+        for (BinaryRow snapshotPartition : snapshotPartitions) {
+            if (partitionComparator.compare(snapshotPartition, partition) < 0) 
{
+                pre = snapshotPartition;
+            } else {
+                break;
+            }
+        }
+        return Optional.ofNullable(pre);
+    }
+
+    private Optional<BinaryRow> findNextSnapshotPartition(
+            List<BinaryRow> snapshotPartitions,
+            BinaryRow partition,
+            RecordComparator partitionComparator) {
+        for (BinaryRow snapshotPartition : snapshotPartitions) {
+            if (partitionComparator.compare(snapshotPartition, partition) > 0) 
{
+                return Optional.of(snapshotPartition);
+            }
+        }
+        return Optional.empty();
+    }
+
+    private boolean isBeforeNextSnapshotPartition(
+            BinaryRow partition,
+            Optional<BinaryRow> nextSnapshotPartition,
+            RecordComparator partitionComparator) {
+        return !nextSnapshotPartition.isPresent()
+                || partitionComparator.compare(partition, 
nextSnapshotPartition.get()) < 0;
+    }
+
+    private String generatePartitionValues(
+            Optional<BinaryRow> partition, InternalRowPartitionComputer 
partitionComputer) {
+        if (!partition.isPresent()) {
+            return "<none>";
+        }
+        return 
partitionComputer.generatePartValues(partition.get()).toString();
+    }
+
+    @Override
+    public void close() throws Exception {}
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
index 76defb1ed0..08a8ea82d1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
@@ -26,8 +26,6 @@ import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
-import org.apache.paimon.table.ChainGroupReadTable;
-import org.apache.paimon.table.FallbackReadFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.CommitCallback;
@@ -78,13 +76,7 @@ public class ChainTableOverwriteCommitCallback implements 
CommitCallback {
             return;
         }
 
-        // Find the underlying table for writing snapshot branch.
-        FileStoreTable candidateTable = table;
-        if (table instanceof FallbackReadFileStoreTable) {
-            candidateTable =
-                    ((ChainGroupReadTable) ((FallbackReadFileStoreTable) 
table).fallback())
-                            .wrapped();
-        }
+        FileStoreTable candidateTable = 
ChainTableUtils.resolveChainPrimaryTable(table);
 
         FileStoreTable snapshotTable =
                 
candidateTable.switchToBranch(coreOptions.scanFallbackSnapshotBranch());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 41ba1412c4..32a184ea4d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -66,6 +66,7 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.CommitPreCallback;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.FileStorePathFactory;
@@ -166,6 +167,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private CommitMetrics commitMetrics;
     private boolean appendCommitCheckConflict = false;
 
+    private final List<CommitPreCallback> commitPreCallbacks;
+
     public FileStoreCommitImpl(
             SnapshotCommit snapshotCommit,
             FileIO fileIO,
@@ -199,7 +202,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             boolean discardDuplicateFiles,
             ConflictDetection.Factory conflictDetectFactory,
             @Nullable StrictModeChecker strictModeChecker,
-            @Nullable CommitRollback rollback) {
+            @Nullable CommitRollback rollback,
+            List<CommitPreCallback> commitPreCallbacks) {
         this.snapshotCommit = snapshotCommit;
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
@@ -241,6 +245,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.strictModeChecker = strictModeChecker;
         this.conflictDetection = conflictDetectFactory.create(scanner);
         this.commitCleaner = new CommitCleaner(manifestList, manifestFile, 
indexManifestFile);
+        this.commitPreCallbacks = commitPreCallbacks;
     }
 
     @Override
@@ -1005,6 +1010,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
 
         boolean success;
+        final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
+        final List<ManifestEntry> finalDeltaFiles = deltaFiles;
+        commitPreCallbacks.forEach(
+                callback ->
+                        callback.call(finalBaseFiles, finalDeltaFiles, 
indexFiles, newSnapshot));
         try {
             success = commitSnapshotImpl(newSnapshot, deltaStatistics);
         } catch (Exception e) {
@@ -1041,8 +1051,6 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         if (strictModeChecker != null) {
             strictModeChecker.update(newSnapshotId);
         }
-        final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
-        final List<ManifestEntry> finalDeltaFiles = deltaFiles;
         commitCallbacks.forEach(
                 callback ->
                         callback.call(finalBaseFiles, finalDeltaFiles, 
indexFiles, newSnapshot));
@@ -1185,6 +1193,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
     @Override
     public void close() {
+        IOUtils.closeAllQuietly(commitPreCallbacks);
         IOUtils.closeAllQuietly(commitCallbacks);
         IOUtils.closeQuietly(snapshotCommit);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java
new file mode 100644
index 0000000000..d9d3d7b649
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+
+import java.util.List;
+
+/**
+ * Callback which is invoked before a snapshot is committed.
+ *
+ * <p>This hook allows additional validation or bookkeeping to be performed 
before a snapshot
+ * becomes visible. Implementations can inspect the files that are about to be 
committed as well as
+ * the {@link Snapshot} metadata and decide whether the commit should proceed.
+ *
+ * <p>If {@link #call(List, List, List, Snapshot)} throws a {@link 
RuntimeException}, the commit is
+ * aborted and the snapshot will not be committed. Implementations are 
expected to be fast and
+ * either side effect free or idempotent, because a single logical commit may 
be retried and this
+ * callback might therefore be invoked multiple times for the same logical 
changes.
+ *
+ * <p>Implementations may optionally override {@link AutoCloseable#close()} if 
they hold external
+ * resources that need to be released when the surrounding {@link 
org.apache.paimon.table.Table}
+ * commit is closed.
+ */
+public interface CommitPreCallback extends AutoCloseable {
+
+    void call(
+            List<SimpleFileEntry> baseFiles,
+            List<ManifestEntry> deltaFiles,
+            List<IndexManifestEntry> indexFiles,
+            Snapshot snapshot);
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
index fd4cdff4d6..f6c573440f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
@@ -25,6 +25,9 @@ import 
org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.partition.PartitionTimeExtractor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.ChainGroupReadTable;
+import org.apache.paimon.table.FallbackReadFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.RowType;
 
 import java.time.LocalDateTime;
@@ -209,4 +212,17 @@ public class ChainTableUtils {
         return options.isChainTable()
                 && 
options.scanFallbackDeltaBranch().equalsIgnoreCase(options.branch());
     }
+
+    public static boolean isScanFallbackSnapshotBranch(CoreOptions options) {
+        return options.isChainTable()
+                && 
options.scanFallbackSnapshotBranch().equalsIgnoreCase(options.branch());
+    }
+
+    public static FileStoreTable resolveChainPrimaryTable(FileStoreTable 
table) {
+        if (table.coreOptions().isChainTable() && table instanceof 
FallbackReadFileStoreTable) {
+            return ((ChainGroupReadTable) ((FallbackReadFileStoreTable) 
table).fallback())
+                    .wrapped();
+        }
+        return table;
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index c808b56491..5d14f0a3cb 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -33,6 +33,8 @@ import java.io.IOException;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Base tests for spark read. */
 public class SparkChainTableITCase {
@@ -126,8 +128,8 @@ public class SparkChainTableITCase {
 
         setupChainTableBranches(spark, "chain_test");
         spark.close();
-        spark = builder.getOrCreate();
 
+        spark = builder.getOrCreate();
         /** Write main branch */
         spark.sql(
                 "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810') values (1, 1, '1'),(2, 1, '1');");
@@ -155,8 +157,8 @@ public class SparkChainTableITCase {
                 "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250812') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');");
         spark.sql(
                 "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250814') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'), 
(5, 1, '1' ), (6, 1, '1');");
-
         spark.close();
+
         spark = builder.getOrCreate();
         /** Main read */
         assertThat(
@@ -263,8 +265,8 @@ public class SparkChainTableITCase {
                         "[4,1,1,20250811]",
                         "[2,2,1-1,20250811]",
                         "[4,1,1,20250811]");
-
         spark.close();
+
         spark = builder.getOrCreate();
         spark.sql("set spark.paimon.branch=delta;");
         spark.sql(
@@ -280,12 +282,11 @@ public class SparkChainTableITCase {
                 spark.sql(
                         "SELECT t1,t2,t3 FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250814'");
         assertThat(df.count()).isEqualTo(1);
-
         spark.close();
+
         spark = builder.getOrCreate();
         /** Drop table */
         spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
-
         spark.close();
     }
 
@@ -319,8 +320,8 @@ public class SparkChainTableITCase {
 
         setupChainTableBranches(spark, "chain_test");
         spark.close();
-        spark = builder.getOrCreate();
 
+        spark = builder.getOrCreate();
         /** Write main branch */
         spark.sql(
                 "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250810', hour = '22') values (1, 1, '1'),(2, 1, '1');");
@@ -348,8 +349,8 @@ public class SparkChainTableITCase {
                 "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 
2, '1-1');");
         spark.sql(
                 "insert overwrite table  `my_db1`.`chain_test` partition (dt = 
'20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 
2, '1-1'), (5, 1, '1' ), (6, 1, '1');");
-
         spark.close();
+
         spark = builder.getOrCreate();
         /** Main read */
         assertThat(
@@ -458,8 +459,8 @@ public class SparkChainTableITCase {
                         "[4,1,1,20250810,23]",
                         "[2,2,1-1,20250810,23]",
                         "[4,1,1,20250810,23]");
-
         spark.close();
+
         spark = builder.getOrCreate();
         spark.sql("set spark.paimon.branch=delta;");
         spark.sql(
@@ -475,8 +476,8 @@ public class SparkChainTableITCase {
                 spark.sql(
                         "SELECT t1,t2,t3 FROM 
`my_db1`.`chain_test$branch_delta` where dt = '20250811' and hour = '02'");
         assertThat(df.count()).isEqualTo(1);
-
         spark.close();
+
         spark = builder.getOrCreate();
         /** Drop table */
         spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
@@ -678,4 +679,79 @@ public class SparkChainTableITCase {
 
         spark.close();
     }
+
+    @Test
+    public void testDropSnapshotPartition(@TempDir java.nio.file.Path tempDir) 
throws IOException {
+        Path warehousePath = new Path("file:" + tempDir.toString());
+        SparkSession.Builder builder = 
createSparkSessionBuilder(warehousePath);
+        SparkSession spark = builder.getOrCreate();
+        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+        spark.sql("USE spark_catalog.my_db1");
+
+        spark.sql(
+                "CREATE TABLE IF NOT EXISTS \n"
+                        + "  `my_db1`.`chain_test_drop_partition` (\n"
+                        + "    `t1` BIGINT COMMENT 't1',\n"
+                        + "    `t2` BIGINT COMMENT 't2',\n"
+                        + "    `t3` STRING COMMENT 't3'\n"
+                        + "  ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW 
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
+                        + "WITH\n"
+                        + "  SERDEPROPERTIES ('serialization.format' = '1') 
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat' 
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES 
(\n"
+                        + "    'bucket-key' = 't1',\n"
+                        + "    'primary-key' = 'dt,t1',\n"
+                        + "    'partition.timestamp-pattern' = '$dt',\n"
+                        + "    'partition.timestamp-formatter' = 'yyyyMMdd',\n"
+                        + "    'chain-table.enabled' = 'true',\n"
+                        + "    'bucket' = '2',\n"
+                        + "    'merge-engine' = 'deduplicate', \n"
+                        + "    'sequence.field' = 't2'\n"
+                        + "  )");
+
+        setupChainTableBranches(spark, "chain_test_drop_partition");
+        spark.close();
+
+        spark = builder.getOrCreate();
+        /** Write delta branch */
+        spark.sql("set spark.paimon.branch=delta;");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test_drop_partition` 
partition (dt = '20260101') values (1, 1, '1'),(2, 1, '1');");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test_drop_partition` 
partition (dt = '20260102') values (1, 2, '1-1' ),(3, 1, '1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test_drop_partition` 
partition (dt = '20260103') values (2, 2, '1-1' ),(4, 1, '1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test_drop_partition` 
partition (dt = '20260104') values (3, 2, '1-1' ),(4, 2, '1-1' );");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test_drop_partition` 
partition (dt = '20260105') values (5, 1, '1' ),(6, 1, '1' );");
+
+        /** Write snapshot branch */
+        spark.sql("set spark.paimon.branch=snapshot;");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test_drop_partition`  
partition (dt = '20260101')  values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test_drop_partition` 
partition (dt = '20260103') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), 
(4, 2, '1-1');");
+        spark.sql(
+                "insert overwrite table  `my_db1`.`chain_test_drop_partition` 
partition (dt = '20260105') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), 
(4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1');");
+        spark.close();
+
+        final SparkSession session = builder.getOrCreate();
+        assertThatNoException()
+                .isThrownBy(
+                        () -> {
+                            session.sql(
+                                    "alter table 
`my_db1`.`chain_test_drop_partition$branch_snapshot` drop partition (dt = 
'20260105');");
+                        });
+        assertThatThrownBy(
+                () -> {
+                    session.sql(
+                            "alter table 
`my_db1`.`chain_test_drop_partition$branch_snapshot` drop partition (dt = 
'20260101');");
+                });
+        session.close();
+
+        spark = builder.getOrCreate();
+        /** Drop table */
+        spark.sql("DROP TABLE IF EXISTS 
`my_db1`.`chain_test_drop_partition`;");
+
+        spark.close();
+    }
 }

Reply via email to