This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d0346de08 [core] Rollback 'COMPACT' commit for row-level operations 
(#6968)
7d0346de08 is described below

commit 7d0346de088c43accbd289f701223cd7d798f1fe
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Jan 18 22:59:27 2026 +0800

    [core] Rollback 'COMPACT' commit for row-level operations (#6968)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  10 +-
 .../TableRollback.java}                            |  27 ++---
 .../paimon/operation/FileStoreCommitImpl.java      |  65 +++++++++---
 ...{RetryCommitResult.java => CommitRollback.java} |  31 +++---
 .../paimon/operation/commit/ConflictDetection.java |  61 +++++++----
 .../paimon/operation/commit/RetryCommitResult.java |  44 ++++++--
 .../apache/paimon/table/CatalogEnvironment.java    |  16 +++
 .../apache/paimon/operation/FileDeletionTest.java  |   1 +
 .../paimon/operation/FileStoreCommitTest.java      |   4 +-
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 117 +++++++++++++++++++++
 10 files changed, 294 insertions(+), 82 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 33a60e73d7..4f8bfb68e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon;
 import org.apache.paimon.CoreOptions.ExternalPathStrategy;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
 import org.apache.paimon.catalog.SnapshotCommit;
+import org.apache.paimon.catalog.TableRollback;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.fs.FileIO;
@@ -43,6 +44,7 @@ import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.operation.commit.CommitRollback;
 import org.apache.paimon.operation.commit.ConflictDetection;
 import org.apache.paimon.operation.commit.StrictModeChecker;
 import org.apache.paimon.partition.PartitionExpireStrategy;
@@ -288,6 +290,11 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                         commitUser,
                         this::newScan,
                         
options.commitStrictModeLastSafeSnapshot().orElse(null));
+        CommitRollback rollback = null;
+        TableRollback tableRollback = 
catalogEnvironment.catalogTableRollback();
+        if (tableRollback != null) {
+            rollback = new CommitRollback(tableRollback);
+        }
         return new FileStoreCommitImpl(
                 snapshotCommit,
                 fileIO,
@@ -320,7 +327,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.rowTrackingEnabled(),
                 options.commitDiscardDuplicateFiles(),
                 conflictDetection,
-                strictModeChecker);
+                strictModeChecker,
+                rollback);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
 b/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java
similarity index 52%
copy from 
paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
copy to paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java
index e64049ea63..5cb2a52192 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java
@@ -16,29 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.operation.commit;
+package org.apache.paimon.catalog;
 
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.table.Instant;
 
-import java.util.List;
+import javax.annotation.Nullable;
 
-/** Need to retry commit of {@link CommitResult}. */
-public class RetryCommitResult implements CommitResult {
+/** Rollback table to instant from snapshot. */
+public interface TableRollback {
 
-    public final Snapshot latestSnapshot;
-    public final List<SimpleFileEntry> baseDataFiles;
-    public final Exception exception;
-
-    public RetryCommitResult(
-            Snapshot latestSnapshot, List<SimpleFileEntry> baseDataFiles, 
Exception exception) {
-        this.latestSnapshot = latestSnapshot;
-        this.baseDataFiles = baseDataFiles;
-        this.exception = exception;
-    }
-
-    @Override
-    public boolean isSuccess() {
-        return false;
-    }
+    void rollbackTo(Instant instant, @Nullable Long fromSnapshot);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index bdcb4482d0..6494b61855 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -43,10 +43,12 @@ import org.apache.paimon.operation.commit.CommitChanges;
 import org.apache.paimon.operation.commit.CommitChangesProvider;
 import org.apache.paimon.operation.commit.CommitCleaner;
 import org.apache.paimon.operation.commit.CommitResult;
+import org.apache.paimon.operation.commit.CommitRollback;
 import org.apache.paimon.operation.commit.CommitScanner;
 import org.apache.paimon.operation.commit.ConflictDetection;
 import org.apache.paimon.operation.commit.ManifestEntryChanges;
 import org.apache.paimon.operation.commit.RetryCommitResult;
+import 
org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult;
 import 
org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
 import org.apache.paimon.operation.commit.StrictModeChecker;
 import org.apache.paimon.operation.commit.SuccessCommitResult;
@@ -138,6 +140,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final ManifestFile manifestFile;
     private final ManifestList manifestList;
     private final IndexManifestFile indexManifestFile;
+    @Nullable private final CommitRollback rollback;
     private final CommitScanner scanner;
     private final int numBucket;
     private final MemorySize manifestTargetSize;
@@ -195,7 +198,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             boolean rowTrackingEnabled,
             boolean discardDuplicateFiles,
             ConflictDetection conflictDetection,
-            @Nullable StrictModeChecker strictModeChecker) {
+            @Nullable StrictModeChecker strictModeChecker,
+            @Nullable CommitRollback rollback) {
         this.snapshotCommit = snapshotCommit;
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
@@ -209,6 +213,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.manifestFile = manifestFileFactory.create();
         this.manifestList = manifestListFactory.create();
         this.indexManifestFile = indexManifestFileFactory.create();
+        this.rollback = rollback;
         this.scanner = new CommitScanner(scan, indexManifestFile, options);
         this.numBucket = numBucket;
         this.manifestTargetSize = manifestTargetSize;
@@ -313,10 +318,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 if (appendCommitCheckConflict) {
                     checkAppendFiles = true;
                 }
+
+                boolean allowRollback = false;
                 if (containsFileDeletionOrDeletionVectors(
                         appendSimpleEntries, changes.appendIndexFiles)) {
                     commitKind = CommitKind.OVERWRITE;
                     checkAppendFiles = true;
+                    allowRollback = true;
                 }
 
                 attempts +=
@@ -329,6 +337,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.properties(),
                                 commitKind,
+                                allowRollback,
                                 checkAppendFiles,
                                 null);
                 generatedSnapshot += 1;
@@ -347,6 +356,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.properties(),
                                 CommitKind.COMPACT,
+                                false,
                                 true,
                                 null);
                 generatedSnapshot += 1;
@@ -512,6 +522,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.properties(),
                                 CommitKind.COMPACT,
+                                false,
                                 true,
                                 null);
                 generatedSnapshot += 1;
@@ -652,6 +663,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 Collections.emptyMap(),
                 CommitKind.ANALYZE,
                 false,
+                false,
                 statsFileName);
     }
 
@@ -678,6 +690,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             @Nullable Long watermark,
             Map<String, String> properties,
             CommitKind commitKind,
+            boolean allowRollback,
             boolean detectConflicts,
             @Nullable String statsFileName) {
         int retryCount = 0;
@@ -696,6 +709,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             watermark,
                             properties,
                             commitKind,
+                            allowRollback,
                             latestSnapshot,
                             detectConflicts,
                             statsFileName);
@@ -742,6 +756,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 watermark,
                 properties,
                 CommitKind.OVERWRITE,
+                false,
                 true,
                 null);
     }
@@ -756,6 +771,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             @Nullable Long watermark,
             Map<String, String> properties,
             CommitKind commitKind,
+            boolean allowRollback,
             @Nullable Snapshot latestSnapshot,
             boolean detectConflicts,
             @Nullable String newStatsFileName) {
@@ -763,13 +779,15 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
         // Check if the commit has been completed. At this point, there will 
be no more repeated
         // commits and just return success
-        if (retryResult != null && latestSnapshot != null) {
+        if (retryResult instanceof CommitFailRetryResult && latestSnapshot != 
null) {
+            CommitFailRetryResult commitFailRetry = (CommitFailRetryResult) 
retryResult;
             Map<Long, Snapshot> snapshotCache = new HashMap<>();
             snapshotCache.put(latestSnapshot.id(), latestSnapshot);
             long startCheckSnapshot = Snapshot.FIRST_SNAPSHOT_ID;
-            if (retryResult.latestSnapshot != null) {
-                snapshotCache.put(retryResult.latestSnapshot.id(), 
retryResult.latestSnapshot);
-                startCheckSnapshot = retryResult.latestSnapshot.id() + 1;
+            if (commitFailRetry.latestSnapshot != null) {
+                snapshotCache.put(
+                        commitFailRetry.latestSnapshot.id(), 
commitFailRetry.latestSnapshot);
+                startCheckSnapshot = commitFailRetry.latestSnapshot.id() + 1;
             }
             for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
                 Snapshot snapshot = snapshotCache.computeIfAbsent(i, 
snapshotManager::snapshot);
@@ -813,11 +831,17 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             // latestSnapshotId is different from the snapshot id we've 
checked for conflicts,
             // so we have to check again
             List<BinaryRow> changedPartitions = changedPartitions(deltaFiles, 
indexFiles);
-            if (retryResult != null && retryResult.latestSnapshot != null) {
-                baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
+            CommitFailRetryResult commitFailRetry =
+                    retryResult instanceof CommitFailRetryResult
+                            ? (CommitFailRetryResult) retryResult
+                            : null;
+            if (commitFailRetry != null
+                    && commitFailRetry.latestSnapshot != null
+                    && commitFailRetry.baseDataFiles != null) {
+                baseDataFiles = new ArrayList<>(commitFailRetry.baseDataFiles);
                 List<SimpleFileEntry> incremental =
                         scanner.readIncrementalChanges(
-                                retryResult.latestSnapshot, latestSnapshot, 
changedPartitions);
+                                commitFailRetry.latestSnapshot, 
latestSnapshot, changedPartitions);
                 if (!incremental.isEmpty()) {
                     baseDataFiles.addAll(incremental);
                     baseDataFiles = new 
ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
@@ -837,12 +861,21 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                 .filter(entry -> 
!baseIdentifiers.contains(entry.identifier()))
                                 .collect(Collectors.toList());
             }
-            conflictDetection.checkNoConflictsOrFail(
-                    latestSnapshot,
-                    baseDataFiles,
-                    SimpleFileEntry.from(deltaFiles),
-                    indexFiles,
-                    commitKind);
+            Optional<RuntimeException> exception =
+                    conflictDetection.checkConflicts(
+                            latestSnapshot,
+                            baseDataFiles,
+                            SimpleFileEntry.from(deltaFiles),
+                            indexFiles,
+                            commitKind);
+            if (exception.isPresent()) {
+                if (allowRollback && rollback != null) {
+                    if (rollback.tryToRollback(latestSnapshot)) {
+                        return RetryCommitResult.forRollback(exception.get());
+                    }
+                }
+                throw exception.get();
+            }
         }
 
         Snapshot newSnapshot;
@@ -971,7 +1004,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         } catch (Exception e) {
             // commit exception, not sure about the situation and should not 
clean up the files
             LOG.warn("Retry commit for exception.", e);
-            return new RetryCommitResult(latestSnapshot, baseDataFiles, e);
+            return RetryCommitResult.forCommitFail(latestSnapshot, 
baseDataFiles, e);
         }
 
         if (!success) {
@@ -988,7 +1021,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     commitTime);
             commitCleaner.cleanUpNoReuseTmpManifests(
                     baseManifestList, mergeBeforeManifests, 
mergeAfterManifests);
-            return new RetryCommitResult(latestSnapshot, baseDataFiles, null);
+            return RetryCommitResult.forCommitFail(latestSnapshot, 
baseDataFiles, null);
         }
 
         LOG.info(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java
similarity index 56%
copy from 
paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
copy to 
paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java
index e64049ea63..683b6555a6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java
@@ -19,26 +19,27 @@
 package org.apache.paimon.operation.commit;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.catalog.TableRollback;
+import org.apache.paimon.table.Instant;
 
-import java.util.List;
+/** Commit rollback to rollback 'COMPACT' commits for resolving conflicts. */
+public class CommitRollback {
 
-/** Need to retry commit of {@link CommitResult}. */
-public class RetryCommitResult implements CommitResult {
+    private final TableRollback tableRollback;
 
-    public final Snapshot latestSnapshot;
-    public final List<SimpleFileEntry> baseDataFiles;
-    public final Exception exception;
-
-    public RetryCommitResult(
-            Snapshot latestSnapshot, List<SimpleFileEntry> baseDataFiles, 
Exception exception) {
-        this.latestSnapshot = latestSnapshot;
-        this.baseDataFiles = baseDataFiles;
-        this.exception = exception;
+    public CommitRollback(TableRollback tableRollback) {
+        this.tableRollback = tableRollback;
     }
 
-    @Override
-    public boolean isSuccess() {
+    public boolean tryToRollback(Snapshot latestSnapshot) {
+        if (latestSnapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+            long latest = latestSnapshot.id();
+            try {
+                tableRollback.rollbackTo(Instant.snapshot(latest - 1), latest);
+                return true;
+            } catch (Exception ignored) {
+            }
+        }
         return false;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 82d01dfdc5..990b47f0f6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -51,6 +51,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -102,7 +103,7 @@ public class ConflictDetection {
         this.partitionExpire = partitionExpire;
     }
 
-    public void checkNoConflictsOrFail(
+    public Optional<RuntimeException> checkConflicts(
             Snapshot snapshot,
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> deltaEntries,
@@ -126,14 +127,20 @@ public class ConflictDetection {
                 deltaEntries =
                         buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries);
             } catch (Throwable e) {
-                throw conflictException(commitUser, baseEntries, 
deltaEntries).apply(e);
+                return Optional.of(
+                        conflictException(commitUser, baseEntries, 
deltaEntries).apply(e));
             }
         }
 
         List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
         allEntries.addAll(deltaEntries);
 
-        checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries, 
baseCommitUser);
+        Optional<RuntimeException> exception =
+                checkBucketKeepSame(
+                        baseEntries, deltaEntries, commitKind, allEntries, 
baseCommitUser);
+        if (exception.isPresent()) {
+            return exception;
+        }
 
         Function<Throwable, RuntimeException> conflictException =
                 conflictException(baseCommitUser, baseEntries, deltaEntries);
@@ -151,21 +158,24 @@ public class ConflictDetection {
             // merge manifest entries and also check if the files we want to 
delete are still there
             mergedEntries = FileEntry.mergeEntries(allEntries);
         } catch (Throwable e) {
-            throw conflictException.apply(e);
+            return Optional.of(conflictException.apply(e));
         }
 
-        checkNoDeleteInMergedEntries(mergedEntries, conflictException);
-        checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries, 
baseCommitUser);
+        exception = checkDeleteInEntries(mergedEntries, conflictException);
+        if (exception.isPresent()) {
+            return exception;
+        }
+        return checkKeyRange(baseEntries, deltaEntries, mergedEntries, 
baseCommitUser);
     }
 
-    private void checkBucketKeepSame(
+    private Optional<RuntimeException> checkBucketKeepSame(
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> deltaEntries,
             CommitKind commitKind,
             List<SimpleFileEntry> allEntries,
             String baseCommitUser) {
         if (commitKind == CommitKind.OVERWRITE) {
-            return;
+            return Optional.empty();
         }
 
         // total buckets within the same partition should remain the same
@@ -199,18 +209,19 @@ public class ConflictDetection {
                             deltaEntries,
                             null);
             LOG.warn("", conflictException.getLeft());
-            throw conflictException.getRight();
+            return Optional.of(conflictException.getRight());
         }
+        return Optional.empty();
     }
 
-    private void checkKeyRangeNoConflicts(
+    private Optional<RuntimeException> checkKeyRange(
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> deltaEntries,
             Collection<SimpleFileEntry> mergedEntries,
             String baseCommitUser) {
         // fast exit for file store without keys
         if (keyComparator == null) {
-            return;
+            return Optional.empty();
         }
 
         // group entries by partitions, buckets and levels
@@ -244,10 +255,11 @@ public class ConflictDetection {
                                     null);
 
                     LOG.warn("", conflictException.getLeft());
-                    throw conflictException.getRight();
+                    return Optional.of(conflictException.getRight());
                 }
             }
         }
+        return Optional.empty();
     }
 
     private Function<Throwable, RuntimeException> conflictException(
@@ -271,7 +283,7 @@ public class ConflictDetection {
         return deletionVectorsEnabled && 
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
     }
 
-    private void checkNoDeleteInMergedEntries(
+    private Optional<RuntimeException> checkDeleteInEntries(
             Collection<SimpleFileEntry> mergedEntries,
             Function<Throwable, RuntimeException> exceptionFunction) {
         try {
@@ -283,12 +295,17 @@ public class ConflictDetection {
                         tableName);
             }
         } catch (Throwable e) {
-            assertConflictForPartitionExpire(mergedEntries);
-            throw exceptionFunction.apply(e);
+            Optional<RuntimeException> exception = 
assertConflictForPartitionExpire(mergedEntries);
+            if (exception.isPresent()) {
+                return exception;
+            }
+            return Optional.of(exceptionFunction.apply(e));
         }
+        return Optional.empty();
     }
 
-    private void assertConflictForPartitionExpire(Collection<SimpleFileEntry> 
mergedEntries) {
+    private Optional<RuntimeException> assertConflictForPartitionExpire(
+            Collection<SimpleFileEntry> mergedEntries) {
         if (partitionExpire != null && partitionExpire.isValueExpiration()) {
             Set<BinaryRow> deletedPartitions = new HashSet<>();
             for (SimpleFileEntry entry : mergedEntries) {
@@ -304,13 +321,15 @@ public class ConflictDetection {
                                                 partToSimpleString(
                                                         partitionType, 
partition, "-", 200))
                                 .collect(Collectors.toList());
-                throw new RuntimeException(
-                        "You are writing data to expired partitions, and you 
can filter this data to avoid job failover."
-                                + " Otherwise, continuous expired records will 
cause the job to failover restart continuously."
-                                + " Expired partitions are: "
-                                + expiredPartitions);
+                return Optional.of(
+                        new RuntimeException(
+                                "You are writing data to expired partitions, 
and you can filter this data to avoid job failover."
+                                        + " Otherwise, continuous expired 
records will cause the job to failover restart continuously."
+                                        + " Expired partitions are: "
+                                        + expiredPartitions));
             }
         }
+        return Optional.empty();
     }
 
     static List<SimpleFileEntry> buildBaseEntriesWithDV(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
index e64049ea63..b9e0ab2a2e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
@@ -21,24 +21,54 @@ package org.apache.paimon.operation.commit;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.manifest.SimpleFileEntry;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 
 /** Need to retry commit of {@link CommitResult}. */
-public class RetryCommitResult implements CommitResult {
+public abstract class RetryCommitResult implements CommitResult {
 
-    public final Snapshot latestSnapshot;
-    public final List<SimpleFileEntry> baseDataFiles;
     public final Exception exception;
 
-    public RetryCommitResult(
-            Snapshot latestSnapshot, List<SimpleFileEntry> baseDataFiles, 
Exception exception) {
-        this.latestSnapshot = latestSnapshot;
-        this.baseDataFiles = baseDataFiles;
+    private RetryCommitResult(Exception exception) {
         this.exception = exception;
     }
 
+    public static RetryCommitResult forCommitFail(
+            Snapshot snapshot, List<SimpleFileEntry> baseDataFiles, Exception 
exception) {
+        return new CommitFailRetryResult(snapshot, baseDataFiles, exception);
+    }
+
+    public static RetryCommitResult forRollback(Exception exception) {
+        return new RollbackRetryResult(exception);
+    }
+
     @Override
     public boolean isSuccess() {
         return false;
     }
+
+    /** Retry result for commit failing. */
+    public static class CommitFailRetryResult extends RetryCommitResult {
+
+        public final @Nullable Snapshot latestSnapshot;
+        public final @Nullable List<SimpleFileEntry> baseDataFiles;
+
+        private CommitFailRetryResult(
+                @Nullable Snapshot latestSnapshot,
+                @Nullable List<SimpleFileEntry> baseDataFiles,
+                Exception exception) {
+            super(exception);
+            this.latestSnapshot = latestSnapshot;
+            this.baseDataFiles = baseDataFiles;
+        }
+    }
+
+    /** Retry result for rollback. */
+    public static class RollbackRetryResult extends RetryCommitResult {
+
+        private RollbackRetryResult(Exception exception) {
+            super(exception);
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java 
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index a0a23d8ca4..8f68d3e04a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -28,6 +28,7 @@ import org.apache.paimon.catalog.CatalogSnapshotCommit;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
 import org.apache.paimon.catalog.SnapshotCommit;
+import org.apache.paimon.catalog.TableRollback;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.source.TableQueryAuth;
 import org.apache.paimon.tag.SnapshotLoaderImpl;
@@ -112,6 +113,21 @@ public class CatalogEnvironment implements Serializable {
         return snapshotCommit;
     }
 
+    @Nullable
+    public TableRollback catalogTableRollback() {
+        if (catalogLoader != null && supportsVersionManagement) {
+            Catalog catalog = catalogLoader.load();
+            return (instant, fromSnapshot) -> {
+                try {
+                    catalog.rollbackTo(identifier, instant, fromSnapshot);
+                } catch (Catalog.TableNotExistException e) {
+                    throw new RuntimeException(e);
+                }
+            };
+        }
+        return null;
+    }
+
     @Nullable
     public SnapshotLoader snapshotLoader() {
         if (catalogLoader == null) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 69812952bf..5806626079 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -929,6 +929,7 @@ public class FileDeletionTest {
                     null,
                     Collections.emptyMap(),
                     Snapshot.CommitKind.APPEND,
+                    false,
                     store.snapshotManager().latestSnapshot(),
                     true,
                     null);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 75a8271ae1..322920fc7e 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -1019,12 +1019,13 @@ public class FileStoreCommitTest {
                     null,
                     Collections.emptyMap(),
                     Snapshot.CommitKind.APPEND,
+                    false,
                     firstLatest,
                     true,
                     null);
             // Compact
             commit.tryCommitOnce(
-                    new RetryCommitResult(firstLatest, 
Collections.emptyList(), null),
+                    RetryCommitResult.forCommitFail(firstLatest, 
Collections.emptyList(), null),
                     Collections.emptyList(),
                     Collections.emptyList(),
                     Collections.emptyList(),
@@ -1032,6 +1033,7 @@ public class FileStoreCommitTest {
                     null,
                     Collections.emptyMap(),
                     Snapshot.CommitKind.COMPACT,
+                    false,
                     store.snapshotManager().latestSnapshot(),
                     true,
                     null);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 300a94287e..a1b00b502a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.PagedList;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.TableType;
+import org.apache.paimon.append.AppendCompactTask;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogTestBase;
@@ -37,6 +38,11 @@ import org.apache.paimon.fs.Path;
 import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.function.FunctionDefinition;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
@@ -71,8 +77,10 @@ import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
@@ -111,6 +119,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
@@ -120,6 +129,7 @@ import static 
org.apache.paimon.CoreOptions.QUERY_AUTH_ENABLED;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.OBJECT_TABLE;
 import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
 import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN;
 import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT;
 import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
@@ -3420,6 +3430,113 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         assertThat(rows.get(0).getString(1).toString()).isIn("Alice", "Bob", 
"Charlie", "David");
     }
 
+    @Test
+    public void testConflictRollback() throws Exception {
+        doTestConflictRollback(false);
+    }
+
+    @Test
+    public void testConflictRollbackFail() throws Exception {
+        doTestConflictRollback(true);
+    }
+
+    private void doTestConflictRollback(boolean insertMiddle) throws Exception 
{
+        Identifier identifier =
+                Identifier.create("test_conflict_rollback", 
"test_conflict_rollback");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(new DataField(0, "col1", 
DataTypes.INT())),
+                        emptyList(),
+                        emptyList(),
+                        new HashMap<>(),
+                        ""),
+                true);
+        Table table = catalog.getTable(identifier);
+
+        // write 5 files
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        List<DataFileMeta> files = new ArrayList<>();
+        for (int i = 0; i < 5; i++) {
+            try (BatchTableWrite write = writeBuilder.newWrite();
+                    BatchTableCommit commit = writeBuilder.newCommit()) {
+                write.write(GenericRow.of(i));
+                List<CommitMessage> commitMessages = write.prepareCommit();
+                commit.commit(commitMessages);
+                DataFileMeta file =
+                        ((CommitMessageImpl) commitMessages.get(0))
+                                .newFilesIncrement()
+                                .newFiles()
+                                .get(0);
+                files.add(file);
+            }
+        }
+
+        // delete write
+        DataFileMeta file = files.get(0);
+        CommitMessageImpl deleteCommitMessage =
+                new CommitMessageImpl(
+                        EMPTY_ROW,
+                        0,
+                        -1,
+                        new DataIncrement(emptyList(), singletonList(file), 
emptyList()),
+                        new CompactIncrement(emptyList(), emptyList(), 
emptyList()));
+
+        // compact write
+        CommitMessage compactCommitMessage;
+        try (BatchTableWrite write = writeBuilder.newWrite()) {
+            AppendCompactTask compactTask = new AppendCompactTask(EMPTY_ROW, 
files);
+            FileStoreWrite<?> fileStoreWrite = ((TableWriteImpl<?>) 
write).getWrite();
+            compactCommitMessage =
+                    compactTask.doCompact(
+                            (FileStoreTable) table, (BaseAppendFileStoreWrite) 
fileStoreWrite);
+        }
+
+        // do compact commit first
+        try (BatchTableCommit commit = writeBuilder.newCommit()) {
+            commit.commit(singletonList(compactCommitMessage));
+        }
+
+        if (insertMiddle) {
+            try (BatchTableWrite write = writeBuilder.newWrite();
+                    BatchTableCommit commit = writeBuilder.newCommit()) {
+                write.write(GenericRow.of(0));
+                commit.commit(write.prepareCommit());
+            }
+        }
+
+        // do delete commit after
+        // expire snapshots first
+        SnapshotManager snapshotManager = ((FileStoreTable) 
table).snapshotManager();
+        snapshotManager.deleteSnapshot(1);
+        snapshotManager.deleteSnapshot(2);
+        try (BatchTableCommit commit = writeBuilder.newCommit()) {
+            List<CommitMessage> messages = singletonList(deleteCommitMessage);
+            if (insertMiddle) {
+                assertThatThrownBy(() -> commit.commit(messages))
+                        .hasMessageContaining("File deletion conflicts 
detected");
+            } else {
+                // should rollback compact commit
+                commit.commit(messages);
+            }
+        }
+
+        // scan for rollback success
+        if (!insertMiddle) {
+            ReadBuilder readBuilder = table.newReadBuilder();
+            List<Integer> result = new ArrayList<>();
+            readBuilder
+                    .newRead()
+                    .createReader(readBuilder.newScan().plan())
+                    .forEachRemaining(r -> result.add(r.getInt(0)));
+            assertThat(result).containsExactlyInAnyOrder(1, 2, 3, 4);
+        }
+
+        // clear
+        catalog.dropDatabase(identifier.getDatabaseName(), false, true);
+    }
+
     protected void createTable(
             Identifier identifier, Map<String, String> options, List<String> 
partitionKeys)
             throws Exception {


Reply via email to