This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c803105923 [core] Optimize commit retry on exception (#5776)
c803105923 is described below

commit c803105923d9dfc42cbac1528df4b6588b5421d8
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 20 10:17:07 2025 +0800

    [core] Optimize commit retry on exception (#5776)
---
 .../paimon/operation/FileStoreCommitImpl.java      | 298 +++++++--------------
 .../paimon/operation/FileStoreCommitTest.java      |  40 +++
 .../apache/paimon/rest/MockRESTCatalogTest.java    |   3 +
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  13 +
 4 files changed, 147 insertions(+), 207 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 2e364d6677..9d634c02e0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -801,11 +801,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
             if (System.currentTimeMillis() - startMillis > commitTimeout
                     || retryCount >= commitMaxRetries) {
-                retryResult.cleanAll();
-                throw new RuntimeException(
+                String message =
                         String.format(
                                 "Commit failed after %s millis with %s 
retries, there maybe exist commit conflicts between multiple jobs.",
-                                commitTimeout, retryCount));
+                                commitTimeout, retryCount);
+                throw new RuntimeException(message, retryResult.exception);
             }
 
             retryCount++;
@@ -897,7 +897,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
                 Snapshot snapshot = snapshotManager.snapshot(i);
                 if (snapshot.commitUser().equals(commitUser)
-                        && snapshot.commitIdentifier() == identifier) {
+                        && snapshot.commitIdentifier() == identifier
+                        && snapshot.commitKind() == commitKind) {
                     return new SuccessResult();
                 }
             }
@@ -942,36 +943,29 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         if (latestSnapshot != null && 
conflictCheck.shouldCheck(latestSnapshot.id())) {
             // latestSnapshotId is different from the snapshot id we've 
checked for conflicts,
             // so we have to check again
-            try {
-                List<BinaryRow> changedPartitions =
-                        deltaFiles.stream()
-                                .map(ManifestEntry::partition)
-                                .distinct()
-                                .collect(Collectors.toList());
-                if (retryResult != null && retryResult.latestSnapshot != null) 
{
-                    baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
-                    List<SimpleFileEntry> incremental =
-                            readIncrementalChanges(
-                                    retryResult.latestSnapshot, 
latestSnapshot, changedPartitions);
-                    if (!incremental.isEmpty()) {
-                        baseDataFiles.addAll(incremental);
-                        baseDataFiles = new 
ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
-                    }
-                } else {
-                    baseDataFiles =
-                            
readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions);
-                }
-                noConflictsOrFail(
-                        latestSnapshot.commitUser(),
-                        baseDataFiles,
-                        SimpleFileEntry.from(deltaFiles),
-                        commitKind);
-            } catch (Exception e) {
-                if (retryResult != null) {
-                    retryResult.cleanAll();
+            List<BinaryRow> changedPartitions =
+                    deltaFiles.stream()
+                            .map(ManifestEntry::partition)
+                            .distinct()
+                            .collect(Collectors.toList());
+            if (retryResult != null && retryResult.latestSnapshot != null) {
+                baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
+                List<SimpleFileEntry> incremental =
+                        readIncrementalChanges(
+                                retryResult.latestSnapshot, latestSnapshot, 
changedPartitions);
+                if (!incremental.isEmpty()) {
+                    baseDataFiles.addAll(incremental);
+                    baseDataFiles = new 
ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
                 }
-                throw e;
+            } else {
+                baseDataFiles =
+                        readAllEntriesFromChangedPartitions(latestSnapshot, 
changedPartitions);
             }
+            noConflictsOrFail(
+                    latestSnapshot.commitUser(),
+                    baseDataFiles,
+                    SimpleFileEntry.from(deltaFiles),
+                    commitKind);
         }
 
         Snapshot newSnapshot;
@@ -1022,33 +1016,17 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             long deltaRecordCount = recordCountAdd(deltaFiles) - 
recordCountDelete(deltaFiles);
             long totalRecordCount = previousTotalRecordCount + 
deltaRecordCount;
 
-            boolean rewriteIndexManifest = true;
-            if (retryResult != null) {
-                deltaStatistics = retryResult.deltaStatistics;
-                deltaManifestList = retryResult.deltaManifestList;
-                changelogManifestList = retryResult.changelogManifestList;
-                if (Objects.equals(oldIndexManifest, 
retryResult.oldIndexManifest)) {
-                    rewriteIndexManifest = false;
-                    indexManifest = retryResult.newIndexManifest;
-                    LOG.info("Reusing index manifest {} for retry.", 
indexManifest);
-                } else {
-                    cleanIndexManifest(retryResult.oldIndexManifest, 
retryResult.newIndexManifest);
-                }
-            } else {
-                // write new delta files into manifest files
-                deltaStatistics = new 
ArrayList<>(PartitionEntry.merge(deltaFiles));
-                deltaManifestList = 
manifestList.write(manifestFile.write(deltaFiles));
+            // write new delta files into manifest files
+            deltaStatistics = new 
ArrayList<>(PartitionEntry.merge(deltaFiles));
+            deltaManifestList = 
manifestList.write(manifestFile.write(deltaFiles));
 
-                // write changelog into manifest files
-                if (!changelogFiles.isEmpty()) {
-                    changelogManifestList = 
manifestList.write(manifestFile.write(changelogFiles));
-                }
+            // write changelog into manifest files
+            if (!changelogFiles.isEmpty()) {
+                changelogManifestList = 
manifestList.write(manifestFile.write(changelogFiles));
             }
 
-            if (rewriteIndexManifest) {
-                indexManifest =
-                        indexManifestFile.writeIndexFiles(oldIndexManifest, 
indexFiles, bucketMode);
-            }
+            indexManifest =
+                    indexManifestFile.writeIndexFiles(oldIndexManifest, 
indexFiles, bucketMode);
 
             long latestSchemaId =
                     schemaManager
@@ -1096,9 +1074,6 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                             properties.isEmpty() ? null : properties);
         } catch (Throwable e) {
             // fails when preparing for commit, we should clean up
-            if (retryResult != null) {
-                retryResult.cleanAll();
-            }
             cleanUpReuseTmpManifests(
                     deltaManifestList, changelogManifestList, 
oldIndexManifest, indexManifest);
             cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, 
mergeAfterManifests);
@@ -1110,54 +1085,57 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     e);
         }
 
-        if (commitSnapshotImpl(newSnapshot, deltaStatistics)) {
-            LOG.info(
-                    "Successfully commit snapshot {} to table {} by user {} "
-                            + "with identifier {} and kind {}.",
+        boolean success;
+        try {
+            success = commitSnapshotImpl(newSnapshot, deltaStatistics);
+        } catch (Exception e) {
+            // commit exception, not sure about the situation and should not 
clean up the files
+            LOG.warn("Retry commit for exception.", e);
+            return new RetryResult(latestSnapshot, baseDataFiles, e);
+        }
+
+        if (!success) {
+            // commit fails, should clean up the files
+            long commitTime = (System.currentTimeMillis() - startMillis) / 
1000;
+            LOG.warn(
+                    "Atomic commit failed for snapshot #{} by user {} "
+                            + "with identifier {} and kind {} after {} 
seconds. "
+                            + "Clean up and try again.",
                     newSnapshotId,
-                    tableName,
                     commitUser,
                     identifier,
-                    commitKind.name());
-            if (strictModeLastSafeSnapshot != null) {
-                strictModeLastSafeSnapshot = newSnapshot.id();
-            }
-            commitCallbacks.forEach(callback -> callback.call(deltaFiles, 
indexFiles, newSnapshot));
-            return new SuccessResult();
+                    commitKind.name(),
+                    commitTime);
+            cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, 
mergeAfterManifests);
+            return new RetryResult(latestSnapshot, baseDataFiles, null);
         }
 
-        // atomic rename fails, clean up and try again
-        long commitTime = (System.currentTimeMillis() - startMillis) / 1000;
-        LOG.warn(
-                String.format(
-                        "Atomic commit failed for snapshot #%d by user %s "
-                                + "with identifier %s and kind %s after %s 
seconds. "
-                                + "Clean up and try again.",
-                        newSnapshotId, commitUser, identifier, 
commitKind.name(), commitTime));
-        cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, 
mergeAfterManifests);
-        return new RetryResult(
-                deltaStatistics,
-                deltaManifestList,
-                changelogManifestList,
-                oldIndexManifest,
-                indexManifest,
-                latestSnapshot,
-                baseDataFiles);
+        LOG.info(
+                "Successfully commit snapshot {} to table {} by user {} "
+                        + "with identifier {} and kind {}.",
+                newSnapshotId,
+                tableName,
+                commitUser,
+                identifier,
+                commitKind.name());
+        if (strictModeLastSafeSnapshot != null) {
+            strictModeLastSafeSnapshot = newSnapshot.id();
+        }
+        commitCallbacks.forEach(callback -> callback.call(deltaFiles, 
indexFiles, newSnapshot));
+        return new SuccessResult();
     }
 
     public void compactManifest() {
         int retryCount = 0;
-        ManifestCompactResult retryResult = null;
         long startMillis = System.currentTimeMillis();
         while (true) {
-            retryResult = compactManifest(retryResult);
-            if (retryResult.isSuccess()) {
+            boolean success = compactManifestOnce();
+            if (success) {
                 break;
             }
 
             if (System.currentTimeMillis() - startMillis > commitTimeout
                     || retryCount >= commitMaxRetries) {
-                retryResult.cleanAll();
                 throw new RuntimeException(
                         String.format(
                                 "Commit failed after %s millis with %s 
retries, there maybe exist commit conflicts between multiple jobs.",
@@ -1168,56 +1146,31 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
-    private ManifestCompactResult compactManifest(@Nullable 
ManifestCompactResult lastResult) {
+    private boolean compactManifestOnce() {
         Snapshot latestSnapshot = snapshotManager.latestSnapshot();
 
         if (latestSnapshot == null) {
-            return new SuccessManifestCompactResult();
+            return true;
         }
 
         List<ManifestFileMeta> mergeBeforeManifests =
                 manifestList.readDataManifests(latestSnapshot);
         List<ManifestFileMeta> mergeAfterManifests;
 
-        if (lastResult != null) {
-            List<ManifestFileMeta> oldMergeBeforeManifests = 
lastResult.mergeBeforeManifests;
-            List<ManifestFileMeta> oldMergeAfterManifests = 
lastResult.mergeAfterManifests;
-
-            Set<String> retryMergeBefore =
-                    oldMergeBeforeManifests.stream()
-                            .map(ManifestFileMeta::fileName)
-                            .collect(Collectors.toSet());
-
-            List<ManifestFileMeta> manifestsFromOther =
-                    mergeBeforeManifests.stream()
-                            .filter(m -> 
!retryMergeBefore.remove(m.fileName()))
-                            .collect(Collectors.toList());
-
-            if (retryMergeBefore.isEmpty()) {
-                // no manifest compact from latest failed commit to latest 
commit
-                mergeAfterManifests = new ArrayList<>(oldMergeAfterManifests);
-                mergeAfterManifests.addAll(manifestsFromOther);
-            } else {
-                // manifest compact happens, quit
-                lastResult.cleanAll();
-                return new SuccessManifestCompactResult();
-            }
-        } else {
-            // the fist trial
-            mergeAfterManifests =
-                    ManifestFileMerger.merge(
-                            mergeBeforeManifests,
-                            manifestFile,
-                            manifestTargetSize.getBytes(),
-                            1,
-                            1,
-                            partitionType,
-                            manifestReadParallelism);
+        // the fist trial
+        mergeAfterManifests =
+                ManifestFileMerger.merge(
+                        mergeBeforeManifests,
+                        manifestFile,
+                        manifestTargetSize.getBytes(),
+                        1,
+                        1,
+                        partitionType,
+                        manifestReadParallelism);
 
-            if (new HashSet<>(mergeBeforeManifests).equals(new 
HashSet<>(mergeAfterManifests))) {
-                // no need to commit this snapshot, because no compact were 
happened
-                return new SuccessManifestCompactResult();
-            }
+        if (new HashSet<>(mergeBeforeManifests).equals(new 
HashSet<>(mergeAfterManifests))) {
+            // no need to commit this snapshot, because no compact were 
happened
+            return true;
         }
 
         Pair<String, Long> baseManifestList = 
manifestList.write(mergeAfterManifests);
@@ -1247,12 +1200,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         latestSnapshot.statistics(),
                         latestSnapshot.properties());
 
-        if (!commitSnapshotImpl(newSnapshot, emptyList())) {
-            return new ManifestCompactResult(
-                    baseManifestList, deltaManifestList, mergeBeforeManifests, 
mergeAfterManifests);
-        } else {
-            return new SuccessManifestCompactResult();
-        }
+        return commitSnapshotImpl(newSnapshot, emptyList());
     }
 
     private boolean commitSnapshotImpl(Snapshot newSnapshot, 
List<PartitionEntry> deltaStatistics) {
@@ -1668,67 +1616,18 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
-    private class RetryResult implements CommitResult {
-
-        private final List<PartitionEntry> deltaStatistics;
-        private final Pair<String, Long> deltaManifestList;
-        private final Pair<String, Long> changelogManifestList;
-
-        private final String oldIndexManifest;
-        private final String newIndexManifest;
+    @VisibleForTesting
+    static class RetryResult implements CommitResult {
 
         private final Snapshot latestSnapshot;
         private final List<SimpleFileEntry> baseDataFiles;
+        private final Exception exception;
 
-        private RetryResult(
-                List<PartitionEntry> deltaStatistics,
-                Pair<String, Long> deltaManifestList,
-                Pair<String, Long> changelogManifestList,
-                String oldIndexManifest,
-                String newIndexManifest,
-                Snapshot latestSnapshot,
-                List<SimpleFileEntry> baseDataFiles) {
-            this.deltaStatistics = deltaStatistics;
-            this.deltaManifestList = deltaManifestList;
-            this.changelogManifestList = changelogManifestList;
-            this.oldIndexManifest = oldIndexManifest;
-            this.newIndexManifest = newIndexManifest;
+        public RetryResult(
+                Snapshot latestSnapshot, List<SimpleFileEntry> baseDataFiles, 
Exception exception) {
             this.latestSnapshot = latestSnapshot;
             this.baseDataFiles = baseDataFiles;
-        }
-
-        private void cleanAll() {
-            cleanUpReuseTmpManifests(
-                    deltaManifestList, changelogManifestList, 
oldIndexManifest, newIndexManifest);
-        }
-
-        @Override
-        public boolean isSuccess() {
-            return false;
-        }
-    }
-
-    private class ManifestCompactResult implements CommitResult {
-
-        private final Pair<String, Long> baseManifestList;
-        private final Pair<String, Long> deltaManifestList;
-        private final List<ManifestFileMeta> mergeBeforeManifests;
-        private final List<ManifestFileMeta> mergeAfterManifests;
-
-        public ManifestCompactResult(
-                Pair<String, Long> baseManifestList,
-                Pair<String, Long> deltaManifestList,
-                List<ManifestFileMeta> mergeBeforeManifests,
-                List<ManifestFileMeta> mergeAfterManifests) {
-            this.baseManifestList = baseManifestList;
-            this.deltaManifestList = deltaManifestList;
-            this.mergeBeforeManifests = mergeBeforeManifests;
-            this.mergeAfterManifests = mergeAfterManifests;
-        }
-
-        public void cleanAll() {
-            manifestList.delete(deltaManifestList.getKey());
-            cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests, 
mergeAfterManifests);
+            this.exception = exception;
         }
 
         @Override
@@ -1736,19 +1635,4 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             return false;
         }
     }
-
-    private class SuccessManifestCompactResult extends ManifestCompactResult {
-
-        public SuccessManifestCompactResult() {
-            super(null, null, null, null);
-        }
-
-        @Override
-        public void cleanAll() {}
-
-        @Override
-        public boolean isSuccess() {
-            return true;
-        }
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index d1ddc3bef0..23e942d3f4 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -38,6 +38,7 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.operation.FileStoreCommitImpl.RetryResult;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
@@ -83,6 +84,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static 
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -1037,6 +1039,44 @@ public class FileStoreCommitTest {
         }
     }
 
+    @Test
+    public void testCommitTwiceWithDifferentKind() throws Exception {
+        TestFileStore store = createStore(false);
+        try (FileStoreCommitImpl commit = store.newCommit()) {
+            // Append
+            Snapshot firstLatest = store.snapshotManager().latestSnapshot();
+            commit.tryCommitOnce(
+                    null,
+                    Collections.emptyList(),
+                    Collections.emptyList(),
+                    Collections.emptyList(),
+                    0,
+                    null,
+                    Collections.emptyMap(),
+                    Collections.emptyMap(),
+                    Snapshot.CommitKind.APPEND,
+                    firstLatest,
+                    mustConflictCheck(),
+                    null);
+            // Compact
+            commit.tryCommitOnce(
+                    new RetryResult(firstLatest, Collections.emptyList(), 
null),
+                    Collections.emptyList(),
+                    Collections.emptyList(),
+                    Collections.emptyList(),
+                    0,
+                    null,
+                    Collections.emptyMap(),
+                    Collections.emptyMap(),
+                    Snapshot.CommitKind.COMPACT,
+                    store.snapshotManager().latestSnapshot(),
+                    mustConflictCheck(),
+                    null);
+        }
+        long id = store.snapshotManager().latestSnapshot().id();
+        assertThat(id).isEqualTo(2);
+    }
+
     private TestFileStore createStore(boolean failing, Map<String, String> 
options)
             throws Exception {
         return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE, 
options);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index 01161fd0f6..cc420a73e9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -76,6 +76,9 @@ class MockRESTCatalogTest extends RESTCatalogTest {
                         AuthProviderEnum.BEAR.identifier());
         this.restCatalog = initCatalog(false);
         this.catalog = restCatalog;
+
+        // test retry commit
+        RESTCatalogServer.commitSuccessThrowException = true;
     }
 
     @AfterEach
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index f46621913c..c46522140a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -1994,6 +1994,8 @@ public class RESTCatalogServer {
         return String.format("%s-%d", identifier.getFullName(), snapshotId);
     }
 
+    public static volatile boolean commitSuccessThrowException = false;
+
     private MockResponse commitSnapshot(
             Identifier identifier,
             String tableId,
@@ -2013,6 +2015,10 @@ public class RESTCatalogServer {
         TableSnapshot tableSnapshot;
         try {
             boolean success = commit.commit(snapshot, branchName, 
Collections.emptyList());
+            if (!success) {
+                return mockResponse(new CommitTableResponse(success), 200);
+            }
+
             // update snapshot and stats
             tableSnapshot =
                     tableLatestSnapshotStore.compute(
@@ -2125,6 +2131,13 @@ public class RESTCatalogServer {
                                                         && 
partition.recordCount() <= 0);
                                 return partitions.isEmpty();
                             });
+            if (commitSuccessThrowException) {
+                commitSuccessThrowException = false;
+                return mockResponse(
+                        new ErrorResponse(
+                                ErrorResponse.RESOURCE_TYPE_TABLE, null, 
"Service Failure", 500),
+                        500);
+            }
             CommitTableResponse response = new CommitTableResponse(success);
             return mockResponse(response, 200);
         } catch (Exception e) {

Reply via email to