This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7d0346de08 [core] Rollback 'COMPACT' commit for row-level operations
(#6968)
7d0346de08 is described below
commit 7d0346de088c43accbd289f701223cd7d798f1fe
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Jan 18 22:59:27 2026 +0800
[core] Rollback 'COMPACT' commit for row-level operations (#6968)
---
.../java/org/apache/paimon/AbstractFileStore.java | 10 +-
.../TableRollback.java} | 27 ++---
.../paimon/operation/FileStoreCommitImpl.java | 65 +++++++++---
...{RetryCommitResult.java => CommitRollback.java} | 31 +++---
.../paimon/operation/commit/ConflictDetection.java | 61 +++++++----
.../paimon/operation/commit/RetryCommitResult.java | 44 ++++++--
.../apache/paimon/table/CatalogEnvironment.java | 16 +++
.../apache/paimon/operation/FileDeletionTest.java | 1 +
.../paimon/operation/FileStoreCommitTest.java | 4 +-
.../org/apache/paimon/rest/RESTCatalogTest.java | 117 +++++++++++++++++++++
10 files changed, 294 insertions(+), 82 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 33a60e73d7..4f8bfb68e9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon;
import org.apache.paimon.CoreOptions.ExternalPathStrategy;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
import org.apache.paimon.catalog.SnapshotCommit;
+import org.apache.paimon.catalog.TableRollback;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
@@ -43,6 +44,7 @@ import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.operation.commit.CommitRollback;
import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.operation.commit.StrictModeChecker;
import org.apache.paimon.partition.PartitionExpireStrategy;
@@ -288,6 +290,11 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
commitUser,
this::newScan,
options.commitStrictModeLastSafeSnapshot().orElse(null));
+ CommitRollback rollback = null;
+ TableRollback tableRollback =
catalogEnvironment.catalogTableRollback();
+ if (tableRollback != null) {
+ rollback = new CommitRollback(tableRollback);
+ }
return new FileStoreCommitImpl(
snapshotCommit,
fileIO,
@@ -320,7 +327,8 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.rowTrackingEnabled(),
options.commitDiscardDuplicateFiles(),
conflictDetection,
- strictModeChecker);
+ strictModeChecker,
+ rollback);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java
similarity index 52%
copy from
paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
copy to paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java
index e64049ea63..5cb2a52192 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/TableRollback.java
@@ -16,29 +16,14 @@
* limitations under the License.
*/
-package org.apache.paimon.operation.commit;
+package org.apache.paimon.catalog;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.table.Instant;
-import java.util.List;
+import javax.annotation.Nullable;
-/** Need to retry commit of {@link CommitResult}. */
-public class RetryCommitResult implements CommitResult {
+/** Rollback table to instant from snapshot. */
+public interface TableRollback {
- public final Snapshot latestSnapshot;
- public final List<SimpleFileEntry> baseDataFiles;
- public final Exception exception;
-
- public RetryCommitResult(
- Snapshot latestSnapshot, List<SimpleFileEntry> baseDataFiles,
Exception exception) {
- this.latestSnapshot = latestSnapshot;
- this.baseDataFiles = baseDataFiles;
- this.exception = exception;
- }
-
- @Override
- public boolean isSuccess() {
- return false;
- }
+ void rollbackTo(Instant instant, @Nullable Long fromSnapshot);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index bdcb4482d0..6494b61855 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -43,10 +43,12 @@ import org.apache.paimon.operation.commit.CommitChanges;
import org.apache.paimon.operation.commit.CommitChangesProvider;
import org.apache.paimon.operation.commit.CommitCleaner;
import org.apache.paimon.operation.commit.CommitResult;
+import org.apache.paimon.operation.commit.CommitRollback;
import org.apache.paimon.operation.commit.CommitScanner;
import org.apache.paimon.operation.commit.ConflictDetection;
import org.apache.paimon.operation.commit.ManifestEntryChanges;
import org.apache.paimon.operation.commit.RetryCommitResult;
+import
org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult;
import
org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
import org.apache.paimon.operation.commit.StrictModeChecker;
import org.apache.paimon.operation.commit.SuccessCommitResult;
@@ -138,6 +140,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
private final ManifestFile manifestFile;
private final ManifestList manifestList;
private final IndexManifestFile indexManifestFile;
+ @Nullable private final CommitRollback rollback;
private final CommitScanner scanner;
private final int numBucket;
private final MemorySize manifestTargetSize;
@@ -195,7 +198,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
boolean rowTrackingEnabled,
boolean discardDuplicateFiles,
ConflictDetection conflictDetection,
- @Nullable StrictModeChecker strictModeChecker) {
+ @Nullable StrictModeChecker strictModeChecker,
+ @Nullable CommitRollback rollback) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
this.schemaManager = schemaManager;
@@ -209,6 +213,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.manifestFile = manifestFileFactory.create();
this.manifestList = manifestListFactory.create();
this.indexManifestFile = indexManifestFileFactory.create();
+ this.rollback = rollback;
this.scanner = new CommitScanner(scan, indexManifestFile, options);
this.numBucket = numBucket;
this.manifestTargetSize = manifestTargetSize;
@@ -313,10 +318,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
if (appendCommitCheckConflict) {
checkAppendFiles = true;
}
+
+ boolean allowRollback = false;
if (containsFileDeletionOrDeletionVectors(
appendSimpleEntries, changes.appendIndexFiles)) {
commitKind = CommitKind.OVERWRITE;
checkAppendFiles = true;
+ allowRollback = true;
}
attempts +=
@@ -329,6 +337,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.properties(),
commitKind,
+ allowRollback,
checkAppendFiles,
null);
generatedSnapshot += 1;
@@ -347,6 +356,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.properties(),
CommitKind.COMPACT,
+ false,
true,
null);
generatedSnapshot += 1;
@@ -512,6 +522,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.properties(),
CommitKind.COMPACT,
+ false,
true,
null);
generatedSnapshot += 1;
@@ -652,6 +663,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
Collections.emptyMap(),
CommitKind.ANALYZE,
false,
+ false,
statsFileName);
}
@@ -678,6 +690,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable Long watermark,
Map<String, String> properties,
CommitKind commitKind,
+ boolean allowRollback,
boolean detectConflicts,
@Nullable String statsFileName) {
int retryCount = 0;
@@ -696,6 +709,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
watermark,
properties,
commitKind,
+ allowRollback,
latestSnapshot,
detectConflicts,
statsFileName);
@@ -742,6 +756,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
watermark,
properties,
CommitKind.OVERWRITE,
+ false,
true,
null);
}
@@ -756,6 +771,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable Long watermark,
Map<String, String> properties,
CommitKind commitKind,
+ boolean allowRollback,
@Nullable Snapshot latestSnapshot,
boolean detectConflicts,
@Nullable String newStatsFileName) {
@@ -763,13 +779,15 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// Check if the commit has been completed. At this point, there will
be no more repeated
// commits and just return success
- if (retryResult != null && latestSnapshot != null) {
+ if (retryResult instanceof CommitFailRetryResult && latestSnapshot !=
null) {
+ CommitFailRetryResult commitFailRetry = (CommitFailRetryResult)
retryResult;
Map<Long, Snapshot> snapshotCache = new HashMap<>();
snapshotCache.put(latestSnapshot.id(), latestSnapshot);
long startCheckSnapshot = Snapshot.FIRST_SNAPSHOT_ID;
- if (retryResult.latestSnapshot != null) {
- snapshotCache.put(retryResult.latestSnapshot.id(),
retryResult.latestSnapshot);
- startCheckSnapshot = retryResult.latestSnapshot.id() + 1;
+ if (commitFailRetry.latestSnapshot != null) {
+ snapshotCache.put(
+ commitFailRetry.latestSnapshot.id(),
commitFailRetry.latestSnapshot);
+ startCheckSnapshot = commitFailRetry.latestSnapshot.id() + 1;
}
for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
Snapshot snapshot = snapshotCache.computeIfAbsent(i,
snapshotManager::snapshot);
@@ -813,11 +831,17 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// latestSnapshotId is different from the snapshot id we've
checked for conflicts,
// so we have to check again
List<BinaryRow> changedPartitions = changedPartitions(deltaFiles,
indexFiles);
- if (retryResult != null && retryResult.latestSnapshot != null) {
- baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
+ CommitFailRetryResult commitFailRetry =
+ retryResult instanceof CommitFailRetryResult
+ ? (CommitFailRetryResult) retryResult
+ : null;
+ if (commitFailRetry != null
+ && commitFailRetry.latestSnapshot != null
+ && commitFailRetry.baseDataFiles != null) {
+ baseDataFiles = new ArrayList<>(commitFailRetry.baseDataFiles);
List<SimpleFileEntry> incremental =
scanner.readIncrementalChanges(
- retryResult.latestSnapshot, latestSnapshot,
changedPartitions);
+ commitFailRetry.latestSnapshot,
latestSnapshot, changedPartitions);
if (!incremental.isEmpty()) {
baseDataFiles.addAll(incremental);
baseDataFiles = new
ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
@@ -837,12 +861,21 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.filter(entry ->
!baseIdentifiers.contains(entry.identifier()))
.collect(Collectors.toList());
}
- conflictDetection.checkNoConflictsOrFail(
- latestSnapshot,
- baseDataFiles,
- SimpleFileEntry.from(deltaFiles),
- indexFiles,
- commitKind);
+ Optional<RuntimeException> exception =
+ conflictDetection.checkConflicts(
+ latestSnapshot,
+ baseDataFiles,
+ SimpleFileEntry.from(deltaFiles),
+ indexFiles,
+ commitKind);
+ if (exception.isPresent()) {
+ if (allowRollback && rollback != null) {
+ if (rollback.tryToRollback(latestSnapshot)) {
+ return RetryCommitResult.forRollback(exception.get());
+ }
+ }
+ throw exception.get();
+ }
}
Snapshot newSnapshot;
@@ -971,7 +1004,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
} catch (Exception e) {
// commit exception, not sure about the situation and should not
clean up the files
LOG.warn("Retry commit for exception.", e);
- return new RetryCommitResult(latestSnapshot, baseDataFiles, e);
+ return RetryCommitResult.forCommitFail(latestSnapshot,
baseDataFiles, e);
}
if (!success) {
@@ -988,7 +1021,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
commitTime);
commitCleaner.cleanUpNoReuseTmpManifests(
baseManifestList, mergeBeforeManifests,
mergeAfterManifests);
- return new RetryCommitResult(latestSnapshot, baseDataFiles, null);
+ return RetryCommitResult.forCommitFail(latestSnapshot,
baseDataFiles, null);
}
LOG.info(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java
similarity index 56%
copy from
paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
copy to
paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java
index e64049ea63..683b6555a6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitRollback.java
@@ -19,26 +19,27 @@
package org.apache.paimon.operation.commit;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.catalog.TableRollback;
+import org.apache.paimon.table.Instant;
-import java.util.List;
+/** Commit rollback to rollback 'COMPACT' commits for resolving conflicts. */
+public class CommitRollback {
-/** Need to retry commit of {@link CommitResult}. */
-public class RetryCommitResult implements CommitResult {
+ private final TableRollback tableRollback;
- public final Snapshot latestSnapshot;
- public final List<SimpleFileEntry> baseDataFiles;
- public final Exception exception;
-
- public RetryCommitResult(
- Snapshot latestSnapshot, List<SimpleFileEntry> baseDataFiles,
Exception exception) {
- this.latestSnapshot = latestSnapshot;
- this.baseDataFiles = baseDataFiles;
- this.exception = exception;
+ public CommitRollback(TableRollback tableRollback) {
+ this.tableRollback = tableRollback;
}
- @Override
- public boolean isSuccess() {
+ public boolean tryToRollback(Snapshot latestSnapshot) {
+ if (latestSnapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
+ long latest = latestSnapshot.id();
+ try {
+ tableRollback.rollbackTo(Instant.snapshot(latest - 1), latest);
+ return true;
+ } catch (Exception ignored) {
+ }
+ }
return false;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 82d01dfdc5..990b47f0f6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -51,6 +51,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -102,7 +103,7 @@ public class ConflictDetection {
this.partitionExpire = partitionExpire;
}
- public void checkNoConflictsOrFail(
+ public Optional<RuntimeException> checkConflicts(
Snapshot snapshot,
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
@@ -126,14 +127,20 @@ public class ConflictDetection {
deltaEntries =
buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries);
} catch (Throwable e) {
- throw conflictException(commitUser, baseEntries,
deltaEntries).apply(e);
+ return Optional.of(
+ conflictException(commitUser, baseEntries,
deltaEntries).apply(e));
}
}
List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
allEntries.addAll(deltaEntries);
- checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries,
baseCommitUser);
+ Optional<RuntimeException> exception =
+ checkBucketKeepSame(
+ baseEntries, deltaEntries, commitKind, allEntries,
baseCommitUser);
+ if (exception.isPresent()) {
+ return exception;
+ }
Function<Throwable, RuntimeException> conflictException =
conflictException(baseCommitUser, baseEntries, deltaEntries);
@@ -151,21 +158,24 @@ public class ConflictDetection {
// merge manifest entries and also check if the files we want to
delete are still there
mergedEntries = FileEntry.mergeEntries(allEntries);
} catch (Throwable e) {
- throw conflictException.apply(e);
+ return Optional.of(conflictException.apply(e));
}
- checkNoDeleteInMergedEntries(mergedEntries, conflictException);
- checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries,
baseCommitUser);
+ exception = checkDeleteInEntries(mergedEntries, conflictException);
+ if (exception.isPresent()) {
+ return exception;
+ }
+ return checkKeyRange(baseEntries, deltaEntries, mergedEntries,
baseCommitUser);
}
- private void checkBucketKeepSame(
+ private Optional<RuntimeException> checkBucketKeepSame(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
CommitKind commitKind,
List<SimpleFileEntry> allEntries,
String baseCommitUser) {
if (commitKind == CommitKind.OVERWRITE) {
- return;
+ return Optional.empty();
}
// total buckets within the same partition should remain the same
@@ -199,18 +209,19 @@ public class ConflictDetection {
deltaEntries,
null);
LOG.warn("", conflictException.getLeft());
- throw conflictException.getRight();
+ return Optional.of(conflictException.getRight());
}
+ return Optional.empty();
}
- private void checkKeyRangeNoConflicts(
+ private Optional<RuntimeException> checkKeyRange(
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> deltaEntries,
Collection<SimpleFileEntry> mergedEntries,
String baseCommitUser) {
// fast exit for file store without keys
if (keyComparator == null) {
- return;
+ return Optional.empty();
}
// group entries by partitions, buckets and levels
@@ -244,10 +255,11 @@ public class ConflictDetection {
null);
LOG.warn("", conflictException.getLeft());
- throw conflictException.getRight();
+ return Optional.of(conflictException.getRight());
}
}
}
+ return Optional.empty();
}
private Function<Throwable, RuntimeException> conflictException(
@@ -271,7 +283,7 @@ public class ConflictDetection {
return deletionVectorsEnabled &&
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
}
- private void checkNoDeleteInMergedEntries(
+ private Optional<RuntimeException> checkDeleteInEntries(
Collection<SimpleFileEntry> mergedEntries,
Function<Throwable, RuntimeException> exceptionFunction) {
try {
@@ -283,12 +295,17 @@ public class ConflictDetection {
tableName);
}
} catch (Throwable e) {
- assertConflictForPartitionExpire(mergedEntries);
- throw exceptionFunction.apply(e);
+ Optional<RuntimeException> exception =
assertConflictForPartitionExpire(mergedEntries);
+ if (exception.isPresent()) {
+ return exception;
+ }
+ return Optional.of(exceptionFunction.apply(e));
}
+ return Optional.empty();
}
- private void assertConflictForPartitionExpire(Collection<SimpleFileEntry>
mergedEntries) {
+ private Optional<RuntimeException> assertConflictForPartitionExpire(
+ Collection<SimpleFileEntry> mergedEntries) {
if (partitionExpire != null && partitionExpire.isValueExpiration()) {
Set<BinaryRow> deletedPartitions = new HashSet<>();
for (SimpleFileEntry entry : mergedEntries) {
@@ -304,13 +321,15 @@ public class ConflictDetection {
partToSimpleString(
partitionType,
partition, "-", 200))
.collect(Collectors.toList());
- throw new RuntimeException(
- "You are writing data to expired partitions, and you
can filter this data to avoid job failover."
- + " Otherwise, continuous expired records will
cause the job to failover restart continuously."
- + " Expired partitions are: "
- + expiredPartitions);
+ return Optional.of(
+ new RuntimeException(
+ "You are writing data to expired partitions,
and you can filter this data to avoid job failover."
+ + " Otherwise, continuous expired
records will cause the job to failover restart continuously."
+ + " Expired partitions are: "
+ + expiredPartitions));
}
}
+ return Optional.empty();
}
static List<SimpleFileEntry> buildBaseEntriesWithDV(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
index e64049ea63..b9e0ab2a2e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java
@@ -21,24 +21,54 @@ package org.apache.paimon.operation.commit;
import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.SimpleFileEntry;
+import javax.annotation.Nullable;
+
import java.util.List;
/** Need to retry commit of {@link CommitResult}. */
-public class RetryCommitResult implements CommitResult {
+public abstract class RetryCommitResult implements CommitResult {
- public final Snapshot latestSnapshot;
- public final List<SimpleFileEntry> baseDataFiles;
public final Exception exception;
- public RetryCommitResult(
- Snapshot latestSnapshot, List<SimpleFileEntry> baseDataFiles,
Exception exception) {
- this.latestSnapshot = latestSnapshot;
- this.baseDataFiles = baseDataFiles;
+ private RetryCommitResult(Exception exception) {
this.exception = exception;
}
+ public static RetryCommitResult forCommitFail(
+ Snapshot snapshot, List<SimpleFileEntry> baseDataFiles, Exception
exception) {
+ return new CommitFailRetryResult(snapshot, baseDataFiles, exception);
+ }
+
+ public static RetryCommitResult forRollback(Exception exception) {
+ return new RollbackRetryResult(exception);
+ }
+
@Override
public boolean isSuccess() {
return false;
}
+
+ /** Retry result for commit failing. */
+ public static class CommitFailRetryResult extends RetryCommitResult {
+
+ public final @Nullable Snapshot latestSnapshot;
+ public final @Nullable List<SimpleFileEntry> baseDataFiles;
+
+ private CommitFailRetryResult(
+ @Nullable Snapshot latestSnapshot,
+ @Nullable List<SimpleFileEntry> baseDataFiles,
+ Exception exception) {
+ super(exception);
+ this.latestSnapshot = latestSnapshot;
+ this.baseDataFiles = baseDataFiles;
+ }
+ }
+
+ /** Retry result for rollback. */
+ public static class RollbackRetryResult extends RetryCommitResult {
+
+ private RollbackRetryResult(Exception exception) {
+ super(exception);
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index a0a23d8ca4..8f68d3e04a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -28,6 +28,7 @@ import org.apache.paimon.catalog.CatalogSnapshotCommit;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.catalog.RenamingSnapshotCommit;
import org.apache.paimon.catalog.SnapshotCommit;
+import org.apache.paimon.catalog.TableRollback;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.table.source.TableQueryAuth;
import org.apache.paimon.tag.SnapshotLoaderImpl;
@@ -112,6 +113,21 @@ public class CatalogEnvironment implements Serializable {
return snapshotCommit;
}
+ @Nullable
+ public TableRollback catalogTableRollback() {
+ if (catalogLoader != null && supportsVersionManagement) {
+ Catalog catalog = catalogLoader.load();
+ return (instant, fromSnapshot) -> {
+ try {
+ catalog.rollbackTo(identifier, instant, fromSnapshot);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ }
+ return null;
+ }
+
@Nullable
public SnapshotLoader snapshotLoader() {
if (catalogLoader == null) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 69812952bf..5806626079 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -929,6 +929,7 @@ public class FileDeletionTest {
null,
Collections.emptyMap(),
Snapshot.CommitKind.APPEND,
+ false,
store.snapshotManager().latestSnapshot(),
true,
null);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 75a8271ae1..322920fc7e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -1019,12 +1019,13 @@ public class FileStoreCommitTest {
null,
Collections.emptyMap(),
Snapshot.CommitKind.APPEND,
+ false,
firstLatest,
true,
null);
// Compact
commit.tryCommitOnce(
- new RetryCommitResult(firstLatest,
Collections.emptyList(), null),
+ RetryCommitResult.forCommitFail(firstLatest,
Collections.emptyList(), null),
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList(),
@@ -1032,6 +1033,7 @@ public class FileStoreCommitTest {
null,
Collections.emptyMap(),
Snapshot.CommitKind.COMPACT,
+ false,
store.snapshotManager().latestSnapshot(),
true,
null);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 300a94287e..a1b00b502a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TableType;
+import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogTestBase;
@@ -37,6 +38,11 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionChange;
import org.apache.paimon.function.FunctionDefinition;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
@@ -71,8 +77,10 @@ import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
@@ -111,6 +119,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
+import static java.util.Collections.emptyList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
@@ -120,6 +129,7 @@ import static
org.apache.paimon.CoreOptions.QUERY_AUTH_ENABLED;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.OBJECT_TABLE;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.rest.RESTApi.PAGE_TOKEN;
import static org.apache.paimon.rest.RESTCatalogOptions.DLF_OSS_ENDPOINT;
import static org.apache.paimon.rest.auth.DLFToken.TOKEN_DATE_FORMATTER;
@@ -3420,6 +3430,113 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
assertThat(rows.get(0).getString(1).toString()).isIn("Alice", "Bob",
"Charlie", "David");
}
+ @Test
+ public void testConflictRollback() throws Exception {
+ doTestConflictRollback(false);
+ }
+
+ @Test
+ public void testConflictRollbackFail() throws Exception {
+ doTestConflictRollback(true);
+ }
+
+ private void doTestConflictRollback(boolean insertMiddle) throws Exception
{
+ Identifier identifier =
+ Identifier.create("test_conflict_rollback",
"test_conflict_rollback");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ catalog.createTable(
+ identifier,
+ new Schema(
+ Lists.newArrayList(new DataField(0, "col1",
DataTypes.INT())),
+ emptyList(),
+ emptyList(),
+ new HashMap<>(),
+ ""),
+ true);
+ Table table = catalog.getTable(identifier);
+
+ // write 5 files
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ List<DataFileMeta> files = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(i));
+ List<CommitMessage> commitMessages = write.prepareCommit();
+ commit.commit(commitMessages);
+ DataFileMeta file =
+ ((CommitMessageImpl) commitMessages.get(0))
+ .newFilesIncrement()
+ .newFiles()
+ .get(0);
+ files.add(file);
+ }
+ }
+
+ // delete write
+ DataFileMeta file = files.get(0);
+ CommitMessageImpl deleteCommitMessage =
+ new CommitMessageImpl(
+ EMPTY_ROW,
+ 0,
+ -1,
+ new DataIncrement(emptyList(), singletonList(file),
emptyList()),
+ new CompactIncrement(emptyList(), emptyList(),
emptyList()));
+
+ // compact write
+ CommitMessage compactCommitMessage;
+ try (BatchTableWrite write = writeBuilder.newWrite()) {
+ AppendCompactTask compactTask = new AppendCompactTask(EMPTY_ROW,
files);
+ FileStoreWrite<?> fileStoreWrite = ((TableWriteImpl<?>)
write).getWrite();
+ compactCommitMessage =
+ compactTask.doCompact(
+ (FileStoreTable) table, (BaseAppendFileStoreWrite)
fileStoreWrite);
+ }
+
+ // do compact commit first
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(singletonList(compactCommitMessage));
+ }
+
+ if (insertMiddle) {
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(GenericRow.of(0));
+ commit.commit(write.prepareCommit());
+ }
+ }
+
+ // do delete commit after
+ // expire snapshots first
+ SnapshotManager snapshotManager = ((FileStoreTable)
table).snapshotManager();
+ snapshotManager.deleteSnapshot(1);
+ snapshotManager.deleteSnapshot(2);
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ List<CommitMessage> messages = singletonList(deleteCommitMessage);
+ if (insertMiddle) {
+ assertThatThrownBy(() -> commit.commit(messages))
+ .hasMessageContaining("File deletion conflicts
detected");
+ } else {
+ // should rollback compact commit
+ commit.commit(messages);
+ }
+ }
+
+ // scan for rollback success
+ if (!insertMiddle) {
+ ReadBuilder readBuilder = table.newReadBuilder();
+ List<Integer> result = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(readBuilder.newScan().plan())
+ .forEachRemaining(r -> result.add(r.getInt(0)));
+ assertThat(result).containsExactlyInAnyOrder(1, 2, 3, 4);
+ }
+
+ // clear
+ catalog.dropDatabase(identifier.getDatabaseName(), false, true);
+ }
+
protected void createTable(
Identifier identifier, Map<String, String> options, List<String>
partitionKeys)
throws Exception {