This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e70f1153fc [core] Introduce retry wait in FileStoreCommitImpl (#5858)
e70f1153fc is described below

commit e70f1153fc69a1d935a85705d53ad34de21eff4d
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 10 16:43:59 2025 +0800

    [core] Introduce retry wait in FileStoreCommitImpl (#5858)
---
 .../shortcodes/generated/core_configuration.html   | 12 ++++++++++
 .../main/java/org/apache/paimon/CoreOptions.java   | 20 ++++++++++++++++
 .../java/org/apache/paimon/AbstractFileStore.java  |  2 ++
 .../paimon/operation/FileStoreCommitImpl.java      | 28 +++++++++++++++++++++-
 .../org/apache/paimon/utils/SnapshotManager.java   |  6 ++++-
 5 files changed, 66 insertions(+), 2 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index ccd24470f5..87afc279b4 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -164,6 +164,18 @@ under the License.
             <td>Integer</td>
             <td>Maximum number of retries when commit failed.</td>
         </tr>
+        <tr>
+            <td><h5>commit.max-retry-wait</h5></td>
+            <td style="word-wrap: break-word;">10 s</td>
+            <td>Duration</td>
+            <td>Max retry wait time when commit failed.</td>
+        </tr>
+        <tr>
+            <td><h5>commit.min-retry-wait</h5></td>
+            <td style="word-wrap: break-word;">10 ms</td>
+            <td>Duration</td>
+            <td>Min retry wait time when commit failed.</td>
+        </tr>
         <tr>
             <td><h5>commit.strict-mode.last-safe-snapshot</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 409340fb2e..7f2dc0217c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -638,6 +638,18 @@ public class CoreOptions implements Serializable {
                     .defaultValue(10)
                     .withDescription("Maximum number of retries when commit 
failed.");
 
+    public static final ConfigOption<Duration> COMMIT_MIN_RETRY_WAIT =
+            key("commit.min-retry-wait")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(10))
+                    .withDescription("Min retry wait time when commit 
failed.");
+
+    public static final ConfigOption<Duration> COMMIT_MAX_RETRY_WAIT =
+            key("commit.max-retry-wait")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(10))
+                    .withDescription("Max retry wait time when commit 
failed.");
+
     public static final ConfigOption<Integer> 
COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT =
             key("compaction.max-size-amplification-percent")
                     .intType()
@@ -2283,6 +2295,14 @@ public class CoreOptions implements Serializable {
                 : options.get(COMMIT_TIMEOUT).toMillis();
     }
 
+    public long commitMinRetryWait() {
+        return options.get(COMMIT_MIN_RETRY_WAIT).toMillis();
+    }
+
+    public long commitMaxRetryWait() {
+        return options.get(COMMIT_MAX_RETRY_WAIT).toMillis();
+    }
+
     public int commitMaxRetries() {
         return options.get(COMMIT_MAX_RETRIES);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index da3f61527e..1f3abde594 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -302,6 +302,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 createCommitCallbacks(commitUser, table),
                 options.commitMaxRetries(),
                 options.commitTimeout(),
+                options.commitMinRetryWait(),
+                options.commitMaxRetryWait(),
                 options.commitStrictModeLastSafeSnapshot().orElse(null));
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 9d634c02e0..743cf36eeb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -79,6 +79,8 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
@@ -140,6 +142,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final StatsFileHandler statsFileHandler;
     private final BucketMode bucketMode;
     private final long commitTimeout;
+    private final long commitMinRetryWait;
+    private final long commitMaxRetryWait;
     private final int commitMaxRetries;
     @Nullable private Long strictModeLastSafeSnapshot;
     private final InternalRowPartitionComputer partitionComputer;
@@ -176,6 +180,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             List<CommitCallback> commitCallbacks,
             int commitMaxRetries,
             long commitTimeout,
+            long commitMinRetryWait,
+            long commitMaxRetryWait,
             @Nullable Long strictModeLastSafeSnapshot) {
         this.snapshotCommit = snapshotCommit;
         this.fileIO = fileIO;
@@ -205,6 +211,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.commitCallbacks = commitCallbacks;
         this.commitMaxRetries = commitMaxRetries;
         this.commitTimeout = commitTimeout;
+        this.commitMinRetryWait = commitMinRetryWait;
+        this.commitMaxRetryWait = commitMaxRetryWait;
         this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
         this.partitionComputer =
                 new InternalRowPartitionComputer(
@@ -808,6 +816,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 throw new RuntimeException(message, retryResult.exception);
             }
 
+            commitRetryWait(retryCount);
             retryCount++;
         }
         return retryCount + 1;
@@ -890,12 +899,15 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         // Check if the commit has been completed. At this point, there will 
be no more repeated
         // commits and just return success
         if (retryResult != null && latestSnapshot != null) {
+            Map<Long, Snapshot> snapshotCache = new HashMap<>();
+            snapshotCache.put(latestSnapshot.id(), latestSnapshot);
             long startCheckSnapshot = Snapshot.FIRST_SNAPSHOT_ID;
             if (retryResult.latestSnapshot != null) {
+                snapshotCache.put(retryResult.latestSnapshot.id(), 
retryResult.latestSnapshot);
                 startCheckSnapshot = retryResult.latestSnapshot.id() + 1;
             }
             for (long i = startCheckSnapshot; i <= latestSnapshot.id(); i++) {
-                Snapshot snapshot = snapshotManager.snapshot(i);
+                Snapshot snapshot = snapshotCache.computeIfAbsent(i, 
snapshotManager::snapshot);
                 if (snapshot.commitUser().equals(commitUser)
                         && snapshot.commitIdentifier() == identifier
                         && snapshot.commitKind() == commitKind) {
@@ -1142,6 +1154,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                                 commitTimeout, retryCount));
             }
 
+            commitRetryWait(retryCount);
             retryCount++;
         }
     }
@@ -1549,6 +1562,19 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
+    private void commitRetryWait(int retryCount) {
+        int retryWait =
+                (int) Math.min(commitMinRetryWait * Math.pow(2, retryCount), 
commitMaxRetryWait);
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        retryWait += random.nextInt(Math.max(1, (int) (retryWait * 0.2)));
+        try {
+            TimeUnit.MILLISECONDS.sleep(retryWait);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(ie);
+        }
+    }
+
     @Override
     public void close() {
         for (CommitCallback callback : commitCallbacks) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 63d32e287b..085f751f28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -167,7 +167,11 @@ public class SnapshotManager implements Serializable {
     public @Nullable Snapshot latestSnapshot() {
         if (snapshotLoader != null) {
             try {
-                return snapshotLoader.load().orElse(null);
+                Snapshot snapshot = snapshotLoader.load().orElse(null);
+                if (snapshot != null && cache != null) {
+                    cache.put(snapshotPath(snapshot.id()), snapshot);
+                }
+                return snapshot;
             } catch (UnsupportedOperationException ignored) {
             } catch (IOException e) {
                 throw new UncheckedIOException(e);

Reply via email to