This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c803105923 [core] Optimize commit retry on exception (#5776)
c803105923 is described below
commit c803105923d9dfc42cbac1528df4b6588b5421d8
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jun 20 10:17:07 2025 +0800
[core] Optimize commit retry on exception (#5776)
---
.../paimon/operation/FileStoreCommitImpl.java | 298 +++++++--------------
.../paimon/operation/FileStoreCommitTest.java | 40 +++
.../apache/paimon/rest/MockRESTCatalogTest.java | 3 +
.../org/apache/paimon/rest/RESTCatalogServer.java | 13 +
4 files changed, 147 insertions(+), 207 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 2e364d6677..9d634c02e0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -801,11 +801,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
if (System.currentTimeMillis() - startMillis > commitTimeout
|| retryCount >= commitMaxRetries) {
- retryResult.cleanAll();
- throw new RuntimeException(
+ String message =
String.format(
"Commit failed after %s millis with %s
retries, there maybe exist commit conflicts between multiple jobs.",
- commitTimeout, retryCount));
+ commitTimeout, retryCount);
+ throw new RuntimeException(message, retryResult.exception);
}
retryCount++;
@@ -897,7 +897,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
Snapshot snapshot = snapshotManager.snapshot(i);
if (snapshot.commitUser().equals(commitUser)
- && snapshot.commitIdentifier() == identifier) {
+ && snapshot.commitIdentifier() == identifier
+ && snapshot.commitKind() == commitKind) {
return new SuccessResult();
}
}
@@ -942,36 +943,29 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
if (latestSnapshot != null &&
conflictCheck.shouldCheck(latestSnapshot.id())) {
// latestSnapshotId is different from the snapshot id we've
checked for conflicts,
// so we have to check again
- try {
- List<BinaryRow> changedPartitions =
- deltaFiles.stream()
- .map(ManifestEntry::partition)
- .distinct()
- .collect(Collectors.toList());
- if (retryResult != null && retryResult.latestSnapshot != null)
{
- baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
- List<SimpleFileEntry> incremental =
- readIncrementalChanges(
- retryResult.latestSnapshot,
latestSnapshot, changedPartitions);
- if (!incremental.isEmpty()) {
- baseDataFiles.addAll(incremental);
- baseDataFiles = new
ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
- }
- } else {
- baseDataFiles =
-
readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions);
- }
- noConflictsOrFail(
- latestSnapshot.commitUser(),
- baseDataFiles,
- SimpleFileEntry.from(deltaFiles),
- commitKind);
- } catch (Exception e) {
- if (retryResult != null) {
- retryResult.cleanAll();
+ List<BinaryRow> changedPartitions =
+ deltaFiles.stream()
+ .map(ManifestEntry::partition)
+ .distinct()
+ .collect(Collectors.toList());
+ if (retryResult != null && retryResult.latestSnapshot != null) {
+ baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
+ List<SimpleFileEntry> incremental =
+ readIncrementalChanges(
+ retryResult.latestSnapshot, latestSnapshot,
changedPartitions);
+ if (!incremental.isEmpty()) {
+ baseDataFiles.addAll(incremental);
+ baseDataFiles = new
ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
}
- throw e;
+ } else {
+ baseDataFiles =
+ readAllEntriesFromChangedPartitions(latestSnapshot,
changedPartitions);
}
+ noConflictsOrFail(
+ latestSnapshot.commitUser(),
+ baseDataFiles,
+ SimpleFileEntry.from(deltaFiles),
+ commitKind);
}
Snapshot newSnapshot;
@@ -1022,33 +1016,17 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
long deltaRecordCount = recordCountAdd(deltaFiles) -
recordCountDelete(deltaFiles);
long totalRecordCount = previousTotalRecordCount +
deltaRecordCount;
- boolean rewriteIndexManifest = true;
- if (retryResult != null) {
- deltaStatistics = retryResult.deltaStatistics;
- deltaManifestList = retryResult.deltaManifestList;
- changelogManifestList = retryResult.changelogManifestList;
- if (Objects.equals(oldIndexManifest,
retryResult.oldIndexManifest)) {
- rewriteIndexManifest = false;
- indexManifest = retryResult.newIndexManifest;
- LOG.info("Reusing index manifest {} for retry.",
indexManifest);
- } else {
- cleanIndexManifest(retryResult.oldIndexManifest,
retryResult.newIndexManifest);
- }
- } else {
- // write new delta files into manifest files
- deltaStatistics = new
ArrayList<>(PartitionEntry.merge(deltaFiles));
- deltaManifestList =
manifestList.write(manifestFile.write(deltaFiles));
+ // write new delta files into manifest files
+ deltaStatistics = new
ArrayList<>(PartitionEntry.merge(deltaFiles));
+ deltaManifestList =
manifestList.write(manifestFile.write(deltaFiles));
- // write changelog into manifest files
- if (!changelogFiles.isEmpty()) {
- changelogManifestList =
manifestList.write(manifestFile.write(changelogFiles));
- }
+ // write changelog into manifest files
+ if (!changelogFiles.isEmpty()) {
+ changelogManifestList =
manifestList.write(manifestFile.write(changelogFiles));
}
- if (rewriteIndexManifest) {
- indexManifest =
- indexManifestFile.writeIndexFiles(oldIndexManifest,
indexFiles, bucketMode);
- }
+ indexManifest =
+ indexManifestFile.writeIndexFiles(oldIndexManifest,
indexFiles, bucketMode);
long latestSchemaId =
schemaManager
@@ -1096,9 +1074,6 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
properties.isEmpty() ? null : properties);
} catch (Throwable e) {
// fails when preparing for commit, we should clean up
- if (retryResult != null) {
- retryResult.cleanAll();
- }
cleanUpReuseTmpManifests(
deltaManifestList, changelogManifestList,
oldIndexManifest, indexManifest);
cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests,
mergeAfterManifests);
@@ -1110,54 +1085,57 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
e);
}
- if (commitSnapshotImpl(newSnapshot, deltaStatistics)) {
- LOG.info(
- "Successfully commit snapshot {} to table {} by user {} "
- + "with identifier {} and kind {}.",
+ boolean success;
+ try {
+ success = commitSnapshotImpl(newSnapshot, deltaStatistics);
+ } catch (Exception e) {
+ // commit exception, not sure about the situation and should not
clean up the files
+ LOG.warn("Retry commit for exception.", e);
+ return new RetryResult(latestSnapshot, baseDataFiles, e);
+ }
+
+ if (!success) {
+ // commit fails, should clean up the files
+ long commitTime = (System.currentTimeMillis() - startMillis) /
1000;
+ LOG.warn(
+ "Atomic commit failed for snapshot #{} by user {} "
+ + "with identifier {} and kind {} after {}
seconds. "
+ + "Clean up and try again.",
newSnapshotId,
- tableName,
commitUser,
identifier,
- commitKind.name());
- if (strictModeLastSafeSnapshot != null) {
- strictModeLastSafeSnapshot = newSnapshot.id();
- }
- commitCallbacks.forEach(callback -> callback.call(deltaFiles,
indexFiles, newSnapshot));
- return new SuccessResult();
+ commitKind.name(),
+ commitTime);
+ cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests,
mergeAfterManifests);
+ return new RetryResult(latestSnapshot, baseDataFiles, null);
}
- // atomic rename fails, clean up and try again
- long commitTime = (System.currentTimeMillis() - startMillis) / 1000;
- LOG.warn(
- String.format(
- "Atomic commit failed for snapshot #%d by user %s "
- + "with identifier %s and kind %s after %s
seconds. "
- + "Clean up and try again.",
- newSnapshotId, commitUser, identifier,
commitKind.name(), commitTime));
- cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests,
mergeAfterManifests);
- return new RetryResult(
- deltaStatistics,
- deltaManifestList,
- changelogManifestList,
- oldIndexManifest,
- indexManifest,
- latestSnapshot,
- baseDataFiles);
+ LOG.info(
+ "Successfully commit snapshot {} to table {} by user {} "
+ + "with identifier {} and kind {}.",
+ newSnapshotId,
+ tableName,
+ commitUser,
+ identifier,
+ commitKind.name());
+ if (strictModeLastSafeSnapshot != null) {
+ strictModeLastSafeSnapshot = newSnapshot.id();
+ }
+ commitCallbacks.forEach(callback -> callback.call(deltaFiles,
indexFiles, newSnapshot));
+ return new SuccessResult();
}
public void compactManifest() {
int retryCount = 0;
- ManifestCompactResult retryResult = null;
long startMillis = System.currentTimeMillis();
while (true) {
- retryResult = compactManifest(retryResult);
- if (retryResult.isSuccess()) {
+ boolean success = compactManifestOnce();
+ if (success) {
break;
}
if (System.currentTimeMillis() - startMillis > commitTimeout
|| retryCount >= commitMaxRetries) {
- retryResult.cleanAll();
throw new RuntimeException(
String.format(
"Commit failed after %s millis with %s
retries, there maybe exist commit conflicts between multiple jobs.",
@@ -1168,56 +1146,31 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
- private ManifestCompactResult compactManifest(@Nullable
ManifestCompactResult lastResult) {
+ private boolean compactManifestOnce() {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot == null) {
- return new SuccessManifestCompactResult();
+ return true;
}
List<ManifestFileMeta> mergeBeforeManifests =
manifestList.readDataManifests(latestSnapshot);
List<ManifestFileMeta> mergeAfterManifests;
- if (lastResult != null) {
- List<ManifestFileMeta> oldMergeBeforeManifests =
lastResult.mergeBeforeManifests;
- List<ManifestFileMeta> oldMergeAfterManifests =
lastResult.mergeAfterManifests;
-
- Set<String> retryMergeBefore =
- oldMergeBeforeManifests.stream()
- .map(ManifestFileMeta::fileName)
- .collect(Collectors.toSet());
-
- List<ManifestFileMeta> manifestsFromOther =
- mergeBeforeManifests.stream()
- .filter(m ->
!retryMergeBefore.remove(m.fileName()))
- .collect(Collectors.toList());
-
- if (retryMergeBefore.isEmpty()) {
- // no manifest compact from latest failed commit to latest
commit
- mergeAfterManifests = new ArrayList<>(oldMergeAfterManifests);
- mergeAfterManifests.addAll(manifestsFromOther);
- } else {
- // manifest compact happens, quit
- lastResult.cleanAll();
- return new SuccessManifestCompactResult();
- }
- } else {
- // the fist trial
- mergeAfterManifests =
- ManifestFileMerger.merge(
- mergeBeforeManifests,
- manifestFile,
- manifestTargetSize.getBytes(),
- 1,
- 1,
- partitionType,
- manifestReadParallelism);
+ // the fist trial
+ mergeAfterManifests =
+ ManifestFileMerger.merge(
+ mergeBeforeManifests,
+ manifestFile,
+ manifestTargetSize.getBytes(),
+ 1,
+ 1,
+ partitionType,
+ manifestReadParallelism);
- if (new HashSet<>(mergeBeforeManifests).equals(new
HashSet<>(mergeAfterManifests))) {
- // no need to commit this snapshot, because no compact were
happened
- return new SuccessManifestCompactResult();
- }
+ if (new HashSet<>(mergeBeforeManifests).equals(new
HashSet<>(mergeAfterManifests))) {
+ // no need to commit this snapshot, because no compact were
happened
+ return true;
}
Pair<String, Long> baseManifestList =
manifestList.write(mergeAfterManifests);
@@ -1247,12 +1200,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
latestSnapshot.statistics(),
latestSnapshot.properties());
- if (!commitSnapshotImpl(newSnapshot, emptyList())) {
- return new ManifestCompactResult(
- baseManifestList, deltaManifestList, mergeBeforeManifests,
mergeAfterManifests);
- } else {
- return new SuccessManifestCompactResult();
- }
+ return commitSnapshotImpl(newSnapshot, emptyList());
}
private boolean commitSnapshotImpl(Snapshot newSnapshot,
List<PartitionEntry> deltaStatistics) {
@@ -1668,67 +1616,18 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
- private class RetryResult implements CommitResult {
-
- private final List<PartitionEntry> deltaStatistics;
- private final Pair<String, Long> deltaManifestList;
- private final Pair<String, Long> changelogManifestList;
-
- private final String oldIndexManifest;
- private final String newIndexManifest;
+ @VisibleForTesting
+ static class RetryResult implements CommitResult {
private final Snapshot latestSnapshot;
private final List<SimpleFileEntry> baseDataFiles;
+ private final Exception exception;
- private RetryResult(
- List<PartitionEntry> deltaStatistics,
- Pair<String, Long> deltaManifestList,
- Pair<String, Long> changelogManifestList,
- String oldIndexManifest,
- String newIndexManifest,
- Snapshot latestSnapshot,
- List<SimpleFileEntry> baseDataFiles) {
- this.deltaStatistics = deltaStatistics;
- this.deltaManifestList = deltaManifestList;
- this.changelogManifestList = changelogManifestList;
- this.oldIndexManifest = oldIndexManifest;
- this.newIndexManifest = newIndexManifest;
+ public RetryResult(
+ Snapshot latestSnapshot, List<SimpleFileEntry> baseDataFiles,
Exception exception) {
this.latestSnapshot = latestSnapshot;
this.baseDataFiles = baseDataFiles;
- }
-
- private void cleanAll() {
- cleanUpReuseTmpManifests(
- deltaManifestList, changelogManifestList,
oldIndexManifest, newIndexManifest);
- }
-
- @Override
- public boolean isSuccess() {
- return false;
- }
- }
-
- private class ManifestCompactResult implements CommitResult {
-
- private final Pair<String, Long> baseManifestList;
- private final Pair<String, Long> deltaManifestList;
- private final List<ManifestFileMeta> mergeBeforeManifests;
- private final List<ManifestFileMeta> mergeAfterManifests;
-
- public ManifestCompactResult(
- Pair<String, Long> baseManifestList,
- Pair<String, Long> deltaManifestList,
- List<ManifestFileMeta> mergeBeforeManifests,
- List<ManifestFileMeta> mergeAfterManifests) {
- this.baseManifestList = baseManifestList;
- this.deltaManifestList = deltaManifestList;
- this.mergeBeforeManifests = mergeBeforeManifests;
- this.mergeAfterManifests = mergeAfterManifests;
- }
-
- public void cleanAll() {
- manifestList.delete(deltaManifestList.getKey());
- cleanUpNoReuseTmpManifests(baseManifestList, mergeBeforeManifests,
mergeAfterManifests);
+ this.exception = exception;
}
@Override
@@ -1736,19 +1635,4 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return false;
}
}
-
- private class SuccessManifestCompactResult extends ManifestCompactResult {
-
- public SuccessManifestCompactResult() {
- super(null, null, null, null);
- }
-
- @Override
- public void cleanAll() {}
-
- @Override
- public boolean isSuccess() {
- return true;
- }
- }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index d1ddc3bef0..23e942d3f4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -38,6 +38,7 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.paimon.operation.FileStoreCommitImpl.RetryResult;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
@@ -83,6 +84,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
+import static
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
import static
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -1037,6 +1039,44 @@ public class FileStoreCommitTest {
}
}
+ @Test
+ public void testCommitTwiceWithDifferentKind() throws Exception {
+ TestFileStore store = createStore(false);
+ try (FileStoreCommitImpl commit = store.newCommit()) {
+ // Append
+ Snapshot firstLatest = store.snapshotManager().latestSnapshot();
+ commit.tryCommitOnce(
+ null,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ 0,
+ null,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Snapshot.CommitKind.APPEND,
+ firstLatest,
+ mustConflictCheck(),
+ null);
+ // Compact
+ commit.tryCommitOnce(
+ new RetryResult(firstLatest, Collections.emptyList(),
null),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ 0,
+ null,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Snapshot.CommitKind.COMPACT,
+ store.snapshotManager().latestSnapshot(),
+ mustConflictCheck(),
+ null);
+ }
+ long id = store.snapshotManager().latestSnapshot().id();
+ assertThat(id).isEqualTo(2);
+ }
+
private TestFileStore createStore(boolean failing, Map<String, String>
options)
throws Exception {
return createStore(failing, 1, CoreOptions.ChangelogProducer.NONE,
options);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index 01161fd0f6..cc420a73e9 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -76,6 +76,9 @@ class MockRESTCatalogTest extends RESTCatalogTest {
AuthProviderEnum.BEAR.identifier());
this.restCatalog = initCatalog(false);
this.catalog = restCatalog;
+
+ // test retry commit
+ RESTCatalogServer.commitSuccessThrowException = true;
}
@AfterEach
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index f46621913c..c46522140a 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -1994,6 +1994,8 @@ public class RESTCatalogServer {
return String.format("%s-%d", identifier.getFullName(), snapshotId);
}
+ public static volatile boolean commitSuccessThrowException = false;
+
private MockResponse commitSnapshot(
Identifier identifier,
String tableId,
@@ -2013,6 +2015,10 @@ public class RESTCatalogServer {
TableSnapshot tableSnapshot;
try {
boolean success = commit.commit(snapshot, branchName,
Collections.emptyList());
+ if (!success) {
+ return mockResponse(new CommitTableResponse(success), 200);
+ }
+
// update snapshot and stats
tableSnapshot =
tableLatestSnapshotStore.compute(
@@ -2125,6 +2131,13 @@ public class RESTCatalogServer {
&&
partition.recordCount() <= 0);
return partitions.isEmpty();
});
+ if (commitSuccessThrowException) {
+ commitSuccessThrowException = false;
+ return mockResponse(
+ new ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_TABLE, null,
"Service Failure", 500),
+ 500);
+ }
CommitTableResponse response = new CommitTableResponse(success);
return mockResponse(response, 200);
} catch (Exception e) {