This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 974f577cd1 [core] Add pre check when drop partition for chain table
(#7109)
974f577cd1 is described below
commit 974f577cd1dfc721f27359c86f227903acd38e98
Author: Stefanietry <[email protected]>
AuthorDate: Thu Feb 5 13:40:30 2026 +0800
[core] Add pre check when drop partition for chain table (#7109)
---
.../java/org/apache/paimon/AbstractFileStore.java | 14 +-
.../metastore/ChainTableCommitPreCallback.java | 207 +++++++++++++++++++++
.../ChainTableOverwriteCommitCallback.java | 10 +-
.../paimon/operation/FileStoreCommitImpl.java | 15 +-
.../paimon/table/sink/CommitPreCallback.java | 51 +++++
.../org/apache/paimon/utils/ChainTableUtils.java | 16 ++
.../apache/paimon/spark/SparkChainTableITCase.java | 94 +++++++++-
7 files changed, 385 insertions(+), 22 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 70108a1920..a92bde3d64 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -36,6 +36,7 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.metastore.AddPartitionCommitCallback;
import org.apache.paimon.metastore.AddPartitionTagCallback;
+import org.apache.paimon.metastore.ChainTableCommitPreCallback;
import org.apache.paimon.metastore.ChainTableOverwriteCommitCallback;
import org.apache.paimon.metastore.TagPreviewCommitCallback;
import org.apache.paimon.operation.ChangelogDeletion;
@@ -59,6 +60,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PartitionHandler;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
+import org.apache.paimon.table.sink.CommitPreCallback;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.SuccessFileTagCallback;
import org.apache.paimon.tag.TagAutoManager;
@@ -333,7 +335,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.commitDiscardDuplicateFiles(),
conflictDetectFactory,
strictModeChecker,
- rollback);
+ rollback,
+ createCommitPreCallbacks(commitUser, table));
}
@Override
@@ -431,6 +434,15 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
return callbacks;
}
+ private List<CommitPreCallback> createCommitPreCallbacks(
+ String commitUser, FileStoreTable table) {
+ List<CommitPreCallback> callbacks = new ArrayList<>();
+ if (options.isChainTable()) {
+ callbacks.add(new ChainTableCommitPreCallback(table));
+ }
+ return callbacks;
+ }
+
@Override
@Nullable
public PartitionExpire newPartitionExpire(String commitUser,
FileStoreTable table) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
new file mode 100644
index 0000000000..0b26cb6375
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableCommitPreCallback.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.metastore;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.RecordComparator;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.operation.commit.ManifestEntryChanges;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitPreCallback;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ChainTableUtils;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+import org.apache.paimon.utils.RowDataToObjectArrayConverter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * {@link CommitPreCallback} implementation for chain tables.
+ *
+ * <p>This callback performs a pre-check before dropping partitions on the
snapshot branch of a
+ * chain table. It verifies that a snapshot partition being dropped is either
followed by no delta
+ * partitions in the chain interval or has a previous snapshot partition that
can serve as its
+ * predecessor.
+ *
+ * <p>The callback is only executed when all of following conditions are met:
+ *
+ * <ul>
+ * <li>The table is configured as a chain table and the current branch is
the snapshot branch (see
+ * {@link ChainTableUtils#isScanFallbackSnapshotBranch(CoreOptions)}).
+ * <li>The committed snapshot kind is {@link CommitKind#OVERWRITE}.
+ * <li>All table and index manifest entries in the commit are {@link
FileKind#DELETE deletes}.
+ * </ul>
+ *
+ * <p>If the validation fails for any of the affected partitions, a {@link
RuntimeException} is
+ * thrown and the commit is aborted.
+ *
+ * <p>This implementation keeps only references to the table and its options
and does not maintain
+ * mutable state between invocations.
+ */
+public class ChainTableCommitPreCallback implements CommitPreCallback {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ChainTableCommitPreCallback.class);
+
+ private transient FileStoreTable table;
+ private transient CoreOptions coreOptions;
+
+ public ChainTableCommitPreCallback(FileStoreTable table) {
+ this.table = table;
+ this.coreOptions = table.coreOptions();
+ }
+
+ @Override
+ public void call(
+ List<SimpleFileEntry> baseFiles,
+ List<ManifestEntry> deltaFiles,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot) {
+ if (!ChainTableUtils.isScanFallbackSnapshotBranch(coreOptions)) {
+ return;
+ }
+ if (snapshot.commitKind() != CommitKind.OVERWRITE) {
+ return;
+ }
+ if (!isPureDeleteCommit(deltaFiles, indexFiles)) {
+ return;
+ }
+ List<BinaryRow> changedPartitions =
+ ManifestEntryChanges.changedPartitions(deltaFiles, indexFiles);
+ FileStoreTable candidateTable =
ChainTableUtils.resolveChainPrimaryTable(table);
+ FileStoreTable deltaTable =
+
candidateTable.switchToBranch(coreOptions.scanFallbackDeltaBranch());
+ RowType partitionType = table.schema().logicalPartitionType();
+ RowDataToObjectArrayConverter partitionConverter =
+ new RowDataToObjectArrayConverter(partitionType);
+ InternalRowPartitionComputer partitionComputer =
+ new InternalRowPartitionComputer(
+ coreOptions.partitionDefaultName(),
+ partitionType,
+ table.schema().partitionKeys().toArray(new String[0]),
+ coreOptions.legacyPartitionName());
+ RecordComparator partitionComparator =
+
CodeGenUtils.newRecordComparator(partitionType.getFieldTypes());
+ List<BinaryRow> snapshotPartitions =
+ table.newSnapshotReader().partitionEntries().stream()
+ .map(PartitionEntry::partition)
+ .sorted(partitionComparator)
+ .collect(Collectors.toList());
+ SnapshotReader deltaSnapshotReader = deltaTable.newSnapshotReader();
+ PredicateBuilder builder = new PredicateBuilder(partitionType);
+ for (BinaryRow partition : changedPartitions) {
+ Optional<BinaryRow> preSnapshotPartition =
+ findPreSnapshotPartition(snapshotPartitions, partition,
partitionComparator);
+ Optional<BinaryRow> nextSnapshotPartition =
+ findNextSnapshotPartition(snapshotPartitions, partition,
partitionComparator);
+ Predicate deltaFollowingPredicate =
+ ChainTableUtils.createTriangularPredicate(
+ partition, partitionConverter, builder::equal,
builder::greaterThan);
+ List<BinaryRow> deltaFollowingPartitions =
+
deltaSnapshotReader.withPartitionFilter(deltaFollowingPredicate)
+ .partitionEntries().stream()
+ .map(PartitionEntry::partition)
+ .filter(
+ deltaPartition ->
+ isBeforeNextSnapshotPartition(
+ deltaPartition,
+ nextSnapshotPartition,
+ partitionComparator))
+ .collect(Collectors.toList());
+ boolean canDrop =
+ deltaFollowingPartitions.isEmpty() ||
preSnapshotPartition.isPresent();
+ LOG.info(
+ "Drop partition, partition={}, canDrop={},
preSnapshotPartition={}, nextSnapshotPartition={}",
+ partitionComputer.generatePartValues(partition),
+ canDrop,
+ generatePartitionValues(preSnapshotPartition,
partitionComputer),
+ generatePartitionValues(nextSnapshotPartition,
partitionComputer));
+ if (!canDrop) {
+ throw new RuntimeException("Snapshot partition cannot be
dropped.");
+ }
+ }
+ }
+
+ private boolean isPureDeleteCommit(
+ List<ManifestEntry> deltaFiles, List<IndexManifestEntry>
indexFiles) {
+ return deltaFiles.stream().allMatch(f -> f.kind() == FileKind.DELETE)
+ && indexFiles.stream().allMatch(f -> f.kind() ==
FileKind.DELETE);
+ }
+
+ private Optional<BinaryRow> findPreSnapshotPartition(
+ List<BinaryRow> snapshotPartitions,
+ BinaryRow partition,
+ RecordComparator partitionComparator) {
+ BinaryRow pre = null;
+ for (BinaryRow snapshotPartition : snapshotPartitions) {
+ if (partitionComparator.compare(snapshotPartition, partition) < 0)
{
+ pre = snapshotPartition;
+ } else {
+ break;
+ }
+ }
+ return Optional.ofNullable(pre);
+ }
+
+ private Optional<BinaryRow> findNextSnapshotPartition(
+ List<BinaryRow> snapshotPartitions,
+ BinaryRow partition,
+ RecordComparator partitionComparator) {
+ for (BinaryRow snapshotPartition : snapshotPartitions) {
+ if (partitionComparator.compare(snapshotPartition, partition) > 0)
{
+ return Optional.of(snapshotPartition);
+ }
+ }
+ return Optional.empty();
+ }
+
+ private boolean isBeforeNextSnapshotPartition(
+ BinaryRow partition,
+ Optional<BinaryRow> nextSnapshotPartition,
+ RecordComparator partitionComparator) {
+ return !nextSnapshotPartition.isPresent()
+ || partitionComparator.compare(partition,
nextSnapshotPartition.get()) < 0;
+ }
+
+ private String generatePartitionValues(
+ Optional<BinaryRow> partition, InternalRowPartitionComputer
partitionComputer) {
+ if (!partition.isPresent()) {
+ return "<none>";
+ }
+ return
partitionComputer.generatePartValues(partition.get()).toString();
+ }
+
+ @Override
+ public void close() throws Exception {}
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
index 76defb1ed0..08a8ea82d1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/metastore/ChainTableOverwriteCommitCallback.java
@@ -26,8 +26,6 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
-import org.apache.paimon.table.ChainGroupReadTable;
-import org.apache.paimon.table.FallbackReadFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitCallback;
@@ -78,13 +76,7 @@ public class ChainTableOverwriteCommitCallback implements
CommitCallback {
return;
}
- // Find the underlying table for writing snapshot branch.
- FileStoreTable candidateTable = table;
- if (table instanceof FallbackReadFileStoreTable) {
- candidateTable =
- ((ChainGroupReadTable) ((FallbackReadFileStoreTable)
table).fallback())
- .wrapped();
- }
+ FileStoreTable candidateTable =
ChainTableUtils.resolveChainPrimaryTable(table);
FileStoreTable snapshotTable =
candidateTable.switchToBranch(coreOptions.scanFallbackSnapshotBranch());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 41ba1412c4..32a184ea4d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -66,6 +66,7 @@ import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.CommitPreCallback;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.FileStorePathFactory;
@@ -166,6 +167,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private CommitMetrics commitMetrics;
private boolean appendCommitCheckConflict = false;
+ private final List<CommitPreCallback> commitPreCallbacks;
+
public FileStoreCommitImpl(
SnapshotCommit snapshotCommit,
FileIO fileIO,
@@ -199,7 +202,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
boolean discardDuplicateFiles,
ConflictDetection.Factory conflictDetectFactory,
@Nullable StrictModeChecker strictModeChecker,
- @Nullable CommitRollback rollback) {
+ @Nullable CommitRollback rollback,
+ List<CommitPreCallback> commitPreCallbacks) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
this.schemaManager = schemaManager;
@@ -241,6 +245,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.strictModeChecker = strictModeChecker;
this.conflictDetection = conflictDetectFactory.create(scanner);
this.commitCleaner = new CommitCleaner(manifestList, manifestFile,
indexManifestFile);
+ this.commitPreCallbacks = commitPreCallbacks;
}
@Override
@@ -1005,6 +1010,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
boolean success;
+ final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
+ final List<ManifestEntry> finalDeltaFiles = deltaFiles;
+ commitPreCallbacks.forEach(
+ callback ->
+ callback.call(finalBaseFiles, finalDeltaFiles,
indexFiles, newSnapshot));
try {
success = commitSnapshotImpl(newSnapshot, deltaStatistics);
} catch (Exception e) {
@@ -1041,8 +1051,6 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
if (strictModeChecker != null) {
strictModeChecker.update(newSnapshotId);
}
- final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
- final List<ManifestEntry> finalDeltaFiles = deltaFiles;
commitCallbacks.forEach(
callback ->
callback.call(finalBaseFiles, finalDeltaFiles,
indexFiles, newSnapshot));
@@ -1185,6 +1193,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
@Override
public void close() {
+ IOUtils.closeAllQuietly(commitPreCallbacks);
IOUtils.closeAllQuietly(commitCallbacks);
IOUtils.closeQuietly(snapshotCommit);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java
new file mode 100644
index 0000000000..d9d3d7b649
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitPreCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+
+import java.util.List;
+
+/**
+ * Callback which is invoked before a snapshot is committed.
+ *
+ * <p>This hook allows additional validation or bookkeeping to be performed
before a snapshot
+ * becomes visible. Implementations can inspect the files that are about to be
committed as well as
+ * the {@link Snapshot} metadata and decide whether the commit should proceed.
+ *
+ * <p>If {@link #call(List, List, List, Snapshot)} throws a {@link
RuntimeException}, the commit is
+ * aborted and the snapshot will not be committed. Implementations are
expected to be fast and
+ * either side effect free or idempotent, because a single logical commit may
be retried and this
+ * callback might therefore be invoked multiple times for the same logical
changes.
+ *
+ * <p>Implementations may optionally override {@link AutoCloseable#close()} if
they hold external
+ * resources that need to be released when the surrounding {@link
org.apache.paimon.table.Table}
+ * commit is closed.
+ */
+public interface CommitPreCallback extends AutoCloseable {
+
+ void call(
+ List<SimpleFileEntry> baseFiles,
+ List<ManifestEntry> deltaFiles,
+ List<IndexManifestEntry> indexFiles,
+ Snapshot snapshot);
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
index fd4cdff4d6..f6c573440f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ChainTableUtils.java
@@ -25,6 +25,9 @@ import
org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.partition.PartitionTimeExtractor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.ChainGroupReadTable;
+import org.apache.paimon.table.FallbackReadFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowType;
import java.time.LocalDateTime;
@@ -209,4 +212,17 @@ public class ChainTableUtils {
return options.isChainTable()
&&
options.scanFallbackDeltaBranch().equalsIgnoreCase(options.branch());
}
+
+ public static boolean isScanFallbackSnapshotBranch(CoreOptions options) {
+ return options.isChainTable()
+ &&
options.scanFallbackSnapshotBranch().equalsIgnoreCase(options.branch());
+ }
+
+ public static FileStoreTable resolveChainPrimaryTable(FileStoreTable
table) {
+ if (table.coreOptions().isChainTable() && table instanceof
FallbackReadFileStoreTable) {
+ return ((ChainGroupReadTable) ((FallbackReadFileStoreTable)
table).fallback())
+ .wrapped();
+ }
+ return table;
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
index c808b56491..5d14f0a3cb 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkChainTableITCase.java
@@ -33,6 +33,8 @@ import java.io.IOException;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Base tests for spark read. */
public class SparkChainTableITCase {
@@ -126,8 +128,8 @@ public class SparkChainTableITCase {
setupChainTableBranches(spark, "chain_test");
spark.close();
- spark = builder.getOrCreate();
+ spark = builder.getOrCreate();
/** Write main branch */
spark.sql(
"insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810') values (1, 1, '1'),(2, 1, '1');");
@@ -155,8 +157,8 @@ public class SparkChainTableITCase {
"insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250812') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1');");
spark.sql(
"insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250814') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4, 2, '1-1'),
(5, 1, '1' ), (6, 1, '1');");
-
spark.close();
+
spark = builder.getOrCreate();
/** Main read */
assertThat(
@@ -263,8 +265,8 @@ public class SparkChainTableITCase {
"[4,1,1,20250811]",
"[2,2,1-1,20250811]",
"[4,1,1,20250811]");
-
spark.close();
+
spark = builder.getOrCreate();
spark.sql("set spark.paimon.branch=delta;");
spark.sql(
@@ -280,12 +282,11 @@ public class SparkChainTableITCase {
spark.sql(
"SELECT t1,t2,t3 FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250814'");
assertThat(df.count()).isEqualTo(1);
-
spark.close();
+
spark = builder.getOrCreate();
/** Drop table */
spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
-
spark.close();
}
@@ -319,8 +320,8 @@ public class SparkChainTableITCase {
setupChainTableBranches(spark, "chain_test");
spark.close();
- spark = builder.getOrCreate();
+ spark = builder.getOrCreate();
/** Write main branch */
spark.sql(
"insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250810', hour = '22') values (1, 1, '1'),(2, 1, '1');");
@@ -348,8 +349,8 @@ public class SparkChainTableITCase {
"insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '00') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4,
2, '1-1');");
spark.sql(
"insert overwrite table `my_db1`.`chain_test` partition (dt =
'20250811', hour = '02') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'), (4,
2, '1-1'), (5, 1, '1' ), (6, 1, '1');");
-
spark.close();
+
spark = builder.getOrCreate();
/** Main read */
assertThat(
@@ -458,8 +459,8 @@ public class SparkChainTableITCase {
"[4,1,1,20250810,23]",
"[2,2,1-1,20250810,23]",
"[4,1,1,20250810,23]");
-
spark.close();
+
spark = builder.getOrCreate();
spark.sql("set spark.paimon.branch=delta;");
spark.sql(
@@ -475,8 +476,8 @@ public class SparkChainTableITCase {
spark.sql(
"SELECT t1,t2,t3 FROM
`my_db1`.`chain_test$branch_delta` where dt = '20250811' and hour = '02'");
assertThat(df.count()).isEqualTo(1);
-
spark.close();
+
spark = builder.getOrCreate();
/** Drop table */
spark.sql("DROP TABLE IF EXISTS `my_db1`.`chain_test`;");
@@ -678,4 +679,79 @@ public class SparkChainTableITCase {
spark.close();
}
+
+ @Test
+ public void testDropSnapshotPartition(@TempDir java.nio.file.Path tempDir)
throws IOException {
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession.Builder builder =
createSparkSessionBuilder(warehousePath);
+ SparkSession spark = builder.getOrCreate();
+ spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
+ spark.sql("USE spark_catalog.my_db1");
+
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS \n"
+ + " `my_db1`.`chain_test_drop_partition` (\n"
+ + " `t1` BIGINT COMMENT 't1',\n"
+ + " `t2` BIGINT COMMENT 't2',\n"
+ + " `t3` STRING COMMENT 't3'\n"
+ + " ) PARTITIONED BY (`dt` STRING COMMENT 'dt') ROW
FORMAT SERDE 'org.apache.paimon.hive.PaimonSerDe'\n"
+ + "WITH\n"
+ + " SERDEPROPERTIES ('serialization.format' = '1')
STORED AS INPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonInputFormat'
OUTPUTFORMAT 'org.apache.paimon.hive.mapred.PaimonOutputFormat' TBLPROPERTIES
(\n"
+ + " 'bucket-key' = 't1',\n"
+ + " 'primary-key' = 'dt,t1',\n"
+ + " 'partition.timestamp-pattern' = '$dt',\n"
+ + " 'partition.timestamp-formatter' = 'yyyyMMdd',\n"
+ + " 'chain-table.enabled' = 'true',\n"
+ + " 'bucket' = '2',\n"
+ + " 'merge-engine' = 'deduplicate', \n"
+ + " 'sequence.field' = 't2'\n"
+ + " )");
+
+ setupChainTableBranches(spark, "chain_test_drop_partition");
+ spark.close();
+
+ spark = builder.getOrCreate();
+ /** Write delta branch */
+ spark.sql("set spark.paimon.branch=delta;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_drop_partition`
partition (dt = '20260101') values (1, 1, '1'),(2, 1, '1');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_drop_partition`
partition (dt = '20260102') values (1, 2, '1-1' ),(3, 1, '1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_drop_partition`
partition (dt = '20260103') values (2, 2, '1-1' ),(4, 1, '1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_drop_partition`
partition (dt = '20260104') values (3, 2, '1-1' ),(4, 2, '1-1' );");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_drop_partition`
partition (dt = '20260105') values (5, 1, '1' ),(6, 1, '1' );");
+
+ /** Write snapshot branch */
+ spark.sql("set spark.paimon.branch=snapshot;");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_drop_partition`
partition (dt = '20260101') values (1, 2, '1-1'),(2, 1, '1'),(3, 1, '1');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_drop_partition`
partition (dt = '20260103') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'),
(4, 2, '1-1');");
+ spark.sql(
+ "insert overwrite table `my_db1`.`chain_test_drop_partition`
partition (dt = '20260105') values (1, 2, '1-1'),(2, 2, '1-1'),(3, 2, '1-1'),
(4, 2, '1-1'), (5, 1, '1' ), (6, 1, '1');");
+ spark.close();
+
+ final SparkSession session = builder.getOrCreate();
+ assertThatNoException()
+ .isThrownBy(
+ () -> {
+ session.sql(
+ "alter table
`my_db1`.`chain_test_drop_partition$branch_snapshot` drop partition (dt =
'20260105');");
+ });
+ assertThatThrownBy(
+ () -> {
+ session.sql(
+ "alter table
`my_db1`.`chain_test_drop_partition$branch_snapshot` drop partition (dt =
'20260101');");
+ });
+ session.close();
+
+ spark = builder.getOrCreate();
+ /** Drop table */
+ spark.sql("DROP TABLE IF EXISTS
`my_db1`.`chain_test_drop_partition`;");
+
+ spark.close();
+ }
}