This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b07e12d2234 [FLINK-39737][s3] Switch to StandardRetryStrategy with 
configurable backoff for throttle-aware retry
b07e12d2234 is described below

commit b07e12d2234f5e26785ffac713772c8820bb8db7
Author: Gabor Somogyi <[email protected]>
AuthorDate: Tue May 26 17:39:45 2026 +0200

    [FLINK-39737][s3] Switch to StandardRetryStrategy with configurable backoff 
for throttle-aware retry
---
 .../fs/s3native/NativeS3FileSystemFactory.java     | 30 ++++++-
 .../apache/flink/fs/s3native/S3ClientProvider.java | 98 +++++++++++++++++++++-
 .../fs/s3native/NativeS3FileSystemFactoryTest.java | 51 +++++++++++
 .../flink/fs/s3native/S3ClientProviderTest.java    | 68 +++++++++++++++
 4 files changed, 243 insertions(+), 4 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 50c0145be93..2d7fea8532e 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -243,10 +243,33 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                     .intType()
                     .defaultValue(3)
                     .withDescription(
-                            "Maximum number of retry attempts for failed S3 
requests. "
-                                    + "Uses the AWS SDK's default retry 
strategy (exponential backoff with jitter). "
+                            "Maximum number of retries for failed S3 requests 
(excluding the initial attempt). "
                                     + "Set to 0 to disable retries.");
 
+    public static final ConfigOption<Duration> RETRY_BASE_DELAY =
+            ConfigOptions.key("s3.retry.base-delay")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(100))
+                    .withDescription(
+                            "Base delay for exponential backoff on 
non-throttle retries. "
+                                    + "Uses exponential backoff with full 
jitter, capped by s3.retry.max-backoff.");
+
+    public static final ConfigOption<Duration> RETRY_THROTTLE_BASE_DELAY =
+            ConfigOptions.key("s3.retry.throttle.base-delay")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(1))
+                    .withDescription(
+                            "Base delay for exponential backoff on throttle 
retries (HTTP 429, 503). "
+                                    + "Uses exponential backoff with full 
jitter, capped by s3.retry.max-backoff.");
+
+    public static final ConfigOption<Duration> RETRY_MAX_BACKOFF =
+            ConfigOptions.key("s3.retry.max-backoff")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(20))
+                    .withDescription(
+                            "Maximum delay cap for exponential backoff, 
applied to both "
+                                    + "normal and throttle retry paths.");
+
     public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
             ConfigOptions.key("s3.connection.timeout")
                     .durationType()
@@ -458,6 +481,9 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                         .assumeRoleSessionName(assumeRoleSessionName)
                         
.assumeRoleSessionDurationSeconds(assumeRoleSessionDuration)
                         .maxRetries(config.get(MAX_RETRIES))
+                        .retryBaseDelay(config.get(RETRY_BASE_DELAY))
+                        
.retryThrottleBaseDelay(config.get(RETRY_THROTTLE_BASE_DELAY))
+                        .retryMaxBackoff(config.get(RETRY_MAX_BACKOFF))
                         .credentialsProviderClasses(credentialsProviderClasses)
                         .encryptionConfig(encryptionConfig)
                         .build();
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index cd1c918c486..e3c7a1f2380 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -33,11 +33,12 @@ import 
software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
-import software.amazon.awssdk.core.retry.RetryPolicy;
 import software.amazon.awssdk.http.apache.ApacheHttpClient;
 import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
+import software.amazon.awssdk.retries.StandardRetryStrategy;
+import software.amazon.awssdk.retries.api.BackoffStrategy;
 import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
 import software.amazon.awssdk.services.s3.S3Client;
@@ -88,6 +89,9 @@ class S3ClientProvider implements AutoCloseableAsync {
     private final boolean checksumValidation;
     private final int maxConnections;
     private final int maxRetries;
+    private final Duration retryBaseDelay;
+    private final Duration retryThrottleBaseDelay;
+    private final Duration retryMaxBackoff;
     @Nullable private final String region;
     @Nullable private final String endpoint;
     @Nullable private final String assumeRoleArn;
@@ -111,6 +115,9 @@ class S3ClientProvider implements AutoCloseableAsync {
             boolean checksumValidation,
             int maxConnections,
             int maxRetries,
+            Duration retryBaseDelay,
+            Duration retryThrottleBaseDelay,
+            Duration retryMaxBackoff,
             @Nullable String region,
             @Nullable String endpoint,
             @Nullable String assumeRoleArn,
@@ -141,6 +148,13 @@ class S3ClientProvider implements AutoCloseableAsync {
         this.checksumValidation = checksumValidation;
         this.maxConnections = maxConnections;
         this.maxRetries = maxRetries;
+        this.retryBaseDelay =
+                Preconditions.checkNotNull(retryBaseDelay, "retryBaseDelay 
must not be null");
+        this.retryThrottleBaseDelay =
+                Preconditions.checkNotNull(
+                        retryThrottleBaseDelay, "retryThrottleBaseDelay must 
not be null");
+        this.retryMaxBackoff =
+                Preconditions.checkNotNull(retryMaxBackoff, "retryMaxBackoff 
must not be null");
         this.region = region;
         this.endpoint = endpoint;
         this.assumeRoleArn = assumeRoleArn;
@@ -214,6 +228,26 @@ class S3ClientProvider implements AutoCloseableAsync {
         return maxRetries;
     }
 
+    @VisibleForTesting
+    int getMaxAttempts() {
+        return maxRetries + 1;
+    }
+
+    @VisibleForTesting
+    Duration getRetryBaseDelay() {
+        return retryBaseDelay;
+    }
+
+    @VisibleForTesting
+    Duration getRetryThrottleBaseDelay() {
+        return retryThrottleBaseDelay;
+    }
+
+    @VisibleForTesting
+    Duration getRetryMaxBackoff() {
+        return retryMaxBackoff;
+    }
+
     @VisibleForTesting
     @Nullable
     String getRegion() {
@@ -312,6 +346,11 @@ class S3ClientProvider implements AutoCloseableAsync {
         private Duration socketTimeout = Duration.ofSeconds(60);
         private Duration connectionMaxIdleTime = Duration.ofSeconds(60);
         private int maxRetries = 3;
+        private Duration retryBaseDelay = 
NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue();
+        private Duration retryThrottleBaseDelay =
+                
NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue();
+        private Duration retryMaxBackoff =
+                NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue();
         private Duration clientCloseTimeout = Duration.ofSeconds(30);
 
         // AssumeRole configuration
@@ -391,6 +430,37 @@ class S3ClientProvider implements AutoCloseableAsync {
             return this;
         }
 
+        public Builder retryBaseDelay(Duration retryBaseDelay) {
+            Preconditions.checkNotNull(retryBaseDelay, "retryBaseDelay must 
not be null");
+            Preconditions.checkArgument(
+                    !retryBaseDelay.isNegative(),
+                    "retryBaseDelay must not be negative, but was %s",
+                    retryBaseDelay);
+            this.retryBaseDelay = retryBaseDelay;
+            return this;
+        }
+
+        public Builder retryThrottleBaseDelay(Duration retryThrottleBaseDelay) 
{
+            Preconditions.checkNotNull(
+                    retryThrottleBaseDelay, "retryThrottleBaseDelay must not 
be null");
+            Preconditions.checkArgument(
+                    !retryThrottleBaseDelay.isNegative(),
+                    "retryThrottleBaseDelay must not be negative, but was %s",
+                    retryThrottleBaseDelay);
+            this.retryThrottleBaseDelay = retryThrottleBaseDelay;
+            return this;
+        }
+
+        public Builder retryMaxBackoff(Duration retryMaxBackoff) {
+            Preconditions.checkNotNull(retryMaxBackoff, "retryMaxBackoff must 
not be null");
+            Preconditions.checkArgument(
+                    retryMaxBackoff.toMillis() > 0,
+                    "retryMaxBackoff must be positive, but was %s",
+                    retryMaxBackoff);
+            this.retryMaxBackoff = retryMaxBackoff;
+            return this;
+        }
+
         public Builder assumeRoleArn(@Nullable String assumeRoleArn) {
             this.assumeRoleArn = assumeRoleArn;
             return this;
@@ -458,9 +528,30 @@ class S3ClientProvider implements AutoCloseableAsync {
                             .checksumValidationEnabled(checksumValidation)
                             .build();
 
+            Preconditions.checkArgument(
+                    retryMaxBackoff.compareTo(retryBaseDelay) >= 0,
+                    "retryMaxBackoff (%s) must be >= retryBaseDelay (%s)",
+                    retryMaxBackoff,
+                    retryBaseDelay);
+            Preconditions.checkArgument(
+                    retryMaxBackoff.compareTo(retryThrottleBaseDelay) >= 0,
+                    "retryMaxBackoff (%s) must be >= retryThrottleBaseDelay 
(%s)",
+                    retryMaxBackoff,
+                    retryThrottleBaseDelay);
+
             ClientOverrideConfiguration overrideConfig =
                     ClientOverrideConfiguration.builder()
-                            
.retryPolicy(RetryPolicy.builder().numRetries(maxRetries).build())
+                            .retryStrategy(
+                                    StandardRetryStrategy.builder()
+                                            .maxAttempts(maxRetries + 1)
+                                            .backoffStrategy(
+                                                    
BackoffStrategy.exponentialDelay(
+                                                            retryBaseDelay, 
retryMaxBackoff))
+                                            .throttlingBackoffStrategy(
+                                                    
BackoffStrategy.exponentialDelay(
+                                                            
retryThrottleBaseDelay,
+                                                            retryMaxBackoff))
+                                            .build())
                             .build();
 
             ApacheHttpClient.Builder httpClientBuilder =
@@ -516,6 +607,9 @@ class S3ClientProvider implements AutoCloseableAsync {
                     checksumValidation,
                     maxConnections,
                     maxRetries,
+                    retryBaseDelay,
+                    retryThrottleBaseDelay,
+                    retryMaxBackoff,
                     region,
                     endpoint,
                     assumeRoleArn,
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index e673af3a55e..f256b0a2040 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -178,6 +178,57 @@ class NativeS3FileSystemFactoryTest {
         
assertThat(createFs(config).getClientProvider().getMaxRetries()).isEqualTo(5);
     }
 
+    @Test
+    void testMaxAttemptsIsMaxRetriesPlusOne() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.MAX_RETRIES, 4);
+        
assertThat(createFs(config).getClientProvider().getMaxAttempts()).isEqualTo(5);
+    }
+
+    // --- Retry backoff ---
+
+    @Test
+    void testRetryBaseDelayDefault() throws Exception {
+        
assertThat(createFs(baseConfig()).getClientProvider().getRetryBaseDelay())
+                
.isEqualTo(NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue());
+    }
+
+    @Test
+    void testRetryBaseDelayExplicitlyConfigured() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.RETRY_BASE_DELAY, 
Duration.ofMillis(200));
+        assertThat(createFs(config).getClientProvider().getRetryBaseDelay())
+                .isEqualTo(Duration.ofMillis(200));
+    }
+
+    @Test
+    void testRetryThrottleBaseDelayDefault() throws Exception {
+        
assertThat(createFs(baseConfig()).getClientProvider().getRetryThrottleBaseDelay())
+                
.isEqualTo(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue());
+    }
+
+    @Test
+    void testRetryThrottleBaseDelayExplicitlyConfigured() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY, 
Duration.ofSeconds(2));
+        
assertThat(createFs(config).getClientProvider().getRetryThrottleBaseDelay())
+                .isEqualTo(Duration.ofSeconds(2));
+    }
+
+    @Test
+    void testRetryMaxBackoffDefault() throws Exception {
+        
assertThat(createFs(baseConfig()).getClientProvider().getRetryMaxBackoff())
+                
.isEqualTo(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue());
+    }
+
+    @Test
+    void testRetryMaxBackoffExplicitlyConfigured() throws Exception {
+        Configuration config = baseConfig();
+        config.set(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF, 
Duration.ofSeconds(30));
+        assertThat(createFs(config).getClientProvider().getRetryMaxBackoff())
+                .isEqualTo(Duration.ofSeconds(30));
+    }
+
     // --- Timeouts ---
 
     @Test
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java
index 4e43aac3383..51a457cc537 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java
@@ -30,6 +30,7 @@ import 
software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
 
 import java.lang.reflect.Field;
+import java.time.Duration;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -203,6 +204,73 @@ class S3ClientProviderTest {
                 .hasMessageContaining("no valid provider class names");
     }
 
+    @Test
+    void testRetryBuilderDefaultsMatchConfigOptions() {
+        S3ClientProvider provider =
+                
S3ClientProvider.builder().endpoint(DUMMY_ENDPOINT).region(DUMMY_REGION).build();
+
+        assertThat(provider.getRetryBaseDelay())
+                
.isEqualTo(NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue());
+        assertThat(provider.getRetryThrottleBaseDelay())
+                
.isEqualTo(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue());
+        assertThat(provider.getRetryMaxBackoff())
+                
.isEqualTo(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue());
+    }
+
+    @Test
+    void testNegativeRetryBaseDelayThrows() {
+        assertThatThrownBy(() -> 
S3ClientProvider.builder().retryBaseDelay(Duration.ofMillis(-1)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("retryBaseDelay must not be negative");
+    }
+
+    @Test
+    void testNegativeRetryThrottleBaseDelayThrows() {
+        assertThatThrownBy(
+                        () ->
+                                S3ClientProvider.builder()
+                                        
.retryThrottleBaseDelay(Duration.ofMillis(-1)))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("retryThrottleBaseDelay must not be 
negative");
+    }
+
+    @Test
+    void testZeroRetryMaxBackoffThrows() {
+        assertThatThrownBy(() -> 
S3ClientProvider.builder().retryMaxBackoff(Duration.ZERO))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("retryMaxBackoff must be positive");
+    }
+
+    @Test
+    void testRetryMaxBackoffSmallerThanBaseDelayThrows() {
+        assertThatThrownBy(
+                        () ->
+                                S3ClientProvider.builder()
+                                        .endpoint(DUMMY_ENDPOINT)
+                                        .region(DUMMY_REGION)
+                                        .retryBaseDelay(Duration.ofSeconds(5))
+                                        .retryMaxBackoff(Duration.ofSeconds(1))
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("retryMaxBackoff")
+                .hasMessageContaining("retryBaseDelay");
+    }
+
+    @Test
+    void testRetryMaxBackoffSmallerThanThrottleBaseDelayThrows() {
+        assertThatThrownBy(
+                        () ->
+                                S3ClientProvider.builder()
+                                        .endpoint(DUMMY_ENDPOINT)
+                                        .region(DUMMY_REGION)
+                                        
.retryThrottleBaseDelay(Duration.ofSeconds(5))
+                                        .retryMaxBackoff(Duration.ofSeconds(1))
+                                        .build())
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("retryMaxBackoff")
+                .hasMessageContaining("retryThrottleBaseDelay");
+    }
+
     @SuppressWarnings("unchecked")
     private static List<AwsCredentialsProvider> 
extractChain(AwsCredentialsProvider provider)
             throws Exception {

Reply via email to