This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b07e12d2234 [FLINK-39737][s3] Switch to StandardRetryStrategy with
configurable backoff for throttle-aware retry
b07e12d2234 is described below
commit b07e12d2234f5e26785ffac713772c8820bb8db7
Author: Gabor Somogyi <[email protected]>
AuthorDate: Tue May 26 17:39:45 2026 +0200
[FLINK-39737][s3] Switch to StandardRetryStrategy with configurable backoff
for throttle-aware retry
---
.../fs/s3native/NativeS3FileSystemFactory.java | 30 ++++++-
.../apache/flink/fs/s3native/S3ClientProvider.java | 98 +++++++++++++++++++++-
.../fs/s3native/NativeS3FileSystemFactoryTest.java | 51 +++++++++++
.../flink/fs/s3native/S3ClientProviderTest.java | 68 +++++++++++++++
4 files changed, 243 insertions(+), 4 deletions(-)
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 50c0145be93..2d7fea8532e 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -243,10 +243,33 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
.intType()
.defaultValue(3)
.withDescription(
- "Maximum number of retry attempts for failed S3
requests. "
- + "Uses the AWS SDK's default retry
strategy (exponential backoff with jitter). "
+ "Maximum number of retries for failed S3 requests
(excluding the initial attempt). "
+ "Set to 0 to disable retries.");
+ public static final ConfigOption<Duration> RETRY_BASE_DELAY =
+ ConfigOptions.key("s3.retry.base-delay")
+ .durationType()
+ .defaultValue(Duration.ofMillis(100))
+ .withDescription(
+ "Base delay for exponential backoff on
non-throttle retries. "
+ + "Uses exponential backoff with full
jitter, capped by s3.retry.max-backoff.");
+
+ public static final ConfigOption<Duration> RETRY_THROTTLE_BASE_DELAY =
+ ConfigOptions.key("s3.retry.throttle.base-delay")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(1))
+ .withDescription(
+ "Base delay for exponential backoff on throttle
retries (HTTP 429, 503). "
+ + "Uses exponential backoff with full
jitter, capped by s3.retry.max-backoff.");
+
+ public static final ConfigOption<Duration> RETRY_MAX_BACKOFF =
+ ConfigOptions.key("s3.retry.max-backoff")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(20))
+ .withDescription(
+ "Maximum delay cap for exponential backoff,
applied to both "
+ + "normal and throttle retry paths.");
+
public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
ConfigOptions.key("s3.connection.timeout")
.durationType()
@@ -458,6 +481,9 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
.assumeRoleSessionName(assumeRoleSessionName)
.assumeRoleSessionDurationSeconds(assumeRoleSessionDuration)
.maxRetries(config.get(MAX_RETRIES))
+ .retryBaseDelay(config.get(RETRY_BASE_DELAY))
+
.retryThrottleBaseDelay(config.get(RETRY_THROTTLE_BASE_DELAY))
+ .retryMaxBackoff(config.get(RETRY_MAX_BACKOFF))
.credentialsProviderClasses(credentialsProviderClasses)
.encryptionConfig(encryptionConfig)
.build();
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index cd1c918c486..e3c7a1f2380 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -33,11 +33,12 @@ import
software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
-import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
+import software.amazon.awssdk.retries.StandardRetryStrategy;
+import software.amazon.awssdk.retries.api.BackoffStrategy;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
@@ -88,6 +89,9 @@ class S3ClientProvider implements AutoCloseableAsync {
private final boolean checksumValidation;
private final int maxConnections;
private final int maxRetries;
+ private final Duration retryBaseDelay;
+ private final Duration retryThrottleBaseDelay;
+ private final Duration retryMaxBackoff;
@Nullable private final String region;
@Nullable private final String endpoint;
@Nullable private final String assumeRoleArn;
@@ -111,6 +115,9 @@ class S3ClientProvider implements AutoCloseableAsync {
boolean checksumValidation,
int maxConnections,
int maxRetries,
+ Duration retryBaseDelay,
+ Duration retryThrottleBaseDelay,
+ Duration retryMaxBackoff,
@Nullable String region,
@Nullable String endpoint,
@Nullable String assumeRoleArn,
@@ -141,6 +148,13 @@ class S3ClientProvider implements AutoCloseableAsync {
this.checksumValidation = checksumValidation;
this.maxConnections = maxConnections;
this.maxRetries = maxRetries;
+ this.retryBaseDelay =
+ Preconditions.checkNotNull(retryBaseDelay, "retryBaseDelay
must not be null");
+ this.retryThrottleBaseDelay =
+ Preconditions.checkNotNull(
+ retryThrottleBaseDelay, "retryThrottleBaseDelay must
not be null");
+ this.retryMaxBackoff =
+ Preconditions.checkNotNull(retryMaxBackoff, "retryMaxBackoff
must not be null");
this.region = region;
this.endpoint = endpoint;
this.assumeRoleArn = assumeRoleArn;
@@ -214,6 +228,26 @@ class S3ClientProvider implements AutoCloseableAsync {
return maxRetries;
}
+ @VisibleForTesting
+ int getMaxAttempts() {
+ return maxRetries + 1;
+ }
+
+ @VisibleForTesting
+ Duration getRetryBaseDelay() {
+ return retryBaseDelay;
+ }
+
+ @VisibleForTesting
+ Duration getRetryThrottleBaseDelay() {
+ return retryThrottleBaseDelay;
+ }
+
+ @VisibleForTesting
+ Duration getRetryMaxBackoff() {
+ return retryMaxBackoff;
+ }
+
@VisibleForTesting
@Nullable
String getRegion() {
@@ -312,6 +346,11 @@ class S3ClientProvider implements AutoCloseableAsync {
private Duration socketTimeout = Duration.ofSeconds(60);
private Duration connectionMaxIdleTime = Duration.ofSeconds(60);
private int maxRetries = 3;
+ private Duration retryBaseDelay =
NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue();
+ private Duration retryThrottleBaseDelay =
+
NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue();
+ private Duration retryMaxBackoff =
+ NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue();
private Duration clientCloseTimeout = Duration.ofSeconds(30);
// AssumeRole configuration
@@ -391,6 +430,37 @@ class S3ClientProvider implements AutoCloseableAsync {
return this;
}
+ public Builder retryBaseDelay(Duration retryBaseDelay) {
+ Preconditions.checkNotNull(retryBaseDelay, "retryBaseDelay must
not be null");
+ Preconditions.checkArgument(
+ !retryBaseDelay.isNegative(),
+ "retryBaseDelay must not be negative, but was %s",
+ retryBaseDelay);
+ this.retryBaseDelay = retryBaseDelay;
+ return this;
+ }
+
+ public Builder retryThrottleBaseDelay(Duration retryThrottleBaseDelay)
{
+ Preconditions.checkNotNull(
+ retryThrottleBaseDelay, "retryThrottleBaseDelay must not
be null");
+ Preconditions.checkArgument(
+ !retryThrottleBaseDelay.isNegative(),
+ "retryThrottleBaseDelay must not be negative, but was %s",
+ retryThrottleBaseDelay);
+ this.retryThrottleBaseDelay = retryThrottleBaseDelay;
+ return this;
+ }
+
+ public Builder retryMaxBackoff(Duration retryMaxBackoff) {
+ Preconditions.checkNotNull(retryMaxBackoff, "retryMaxBackoff must
not be null");
+ Preconditions.checkArgument(
+ retryMaxBackoff.toMillis() > 0,
+ "retryMaxBackoff must be positive, but was %s",
+ retryMaxBackoff);
+ this.retryMaxBackoff = retryMaxBackoff;
+ return this;
+ }
+
public Builder assumeRoleArn(@Nullable String assumeRoleArn) {
this.assumeRoleArn = assumeRoleArn;
return this;
@@ -458,9 +528,30 @@ class S3ClientProvider implements AutoCloseableAsync {
.checksumValidationEnabled(checksumValidation)
.build();
+ Preconditions.checkArgument(
+ retryMaxBackoff.compareTo(retryBaseDelay) >= 0,
+ "retryMaxBackoff (%s) must be >= retryBaseDelay (%s)",
+ retryMaxBackoff,
+ retryBaseDelay);
+ Preconditions.checkArgument(
+ retryMaxBackoff.compareTo(retryThrottleBaseDelay) >= 0,
+ "retryMaxBackoff (%s) must be >= retryThrottleBaseDelay
(%s)",
+ retryMaxBackoff,
+ retryThrottleBaseDelay);
+
ClientOverrideConfiguration overrideConfig =
ClientOverrideConfiguration.builder()
-
.retryPolicy(RetryPolicy.builder().numRetries(maxRetries).build())
+ .retryStrategy(
+ StandardRetryStrategy.builder()
+ .maxAttempts(maxRetries + 1)
+ .backoffStrategy(
+
BackoffStrategy.exponentialDelay(
+ retryBaseDelay,
retryMaxBackoff))
+ .throttlingBackoffStrategy(
+
BackoffStrategy.exponentialDelay(
+
retryThrottleBaseDelay,
+ retryMaxBackoff))
+ .build())
.build();
ApacheHttpClient.Builder httpClientBuilder =
@@ -516,6 +607,9 @@ class S3ClientProvider implements AutoCloseableAsync {
checksumValidation,
maxConnections,
maxRetries,
+ retryBaseDelay,
+ retryThrottleBaseDelay,
+ retryMaxBackoff,
region,
endpoint,
assumeRoleArn,
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index e673af3a55e..f256b0a2040 100644
---
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -178,6 +178,57 @@ class NativeS3FileSystemFactoryTest {
assertThat(createFs(config).getClientProvider().getMaxRetries()).isEqualTo(5);
}
+ @Test
+ void testMaxAttemptsIsMaxRetriesPlusOne() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.MAX_RETRIES, 4);
+
assertThat(createFs(config).getClientProvider().getMaxAttempts()).isEqualTo(5);
+ }
+
+ // --- Retry backoff ---
+
+ @Test
+ void testRetryBaseDelayDefault() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().getRetryBaseDelay())
+
.isEqualTo(NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue());
+ }
+
+ @Test
+ void testRetryBaseDelayExplicitlyConfigured() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.RETRY_BASE_DELAY,
Duration.ofMillis(200));
+ assertThat(createFs(config).getClientProvider().getRetryBaseDelay())
+ .isEqualTo(Duration.ofMillis(200));
+ }
+
+ @Test
+ void testRetryThrottleBaseDelayDefault() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().getRetryThrottleBaseDelay())
+
.isEqualTo(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue());
+ }
+
+ @Test
+ void testRetryThrottleBaseDelayExplicitlyConfigured() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY,
Duration.ofSeconds(2));
+
assertThat(createFs(config).getClientProvider().getRetryThrottleBaseDelay())
+ .isEqualTo(Duration.ofSeconds(2));
+ }
+
+ @Test
+ void testRetryMaxBackoffDefault() throws Exception {
+
assertThat(createFs(baseConfig()).getClientProvider().getRetryMaxBackoff())
+
.isEqualTo(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue());
+ }
+
+ @Test
+ void testRetryMaxBackoffExplicitlyConfigured() throws Exception {
+ Configuration config = baseConfig();
+ config.set(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF,
Duration.ofSeconds(30));
+ assertThat(createFs(config).getClientProvider().getRetryMaxBackoff())
+ .isEqualTo(Duration.ofSeconds(30));
+ }
+
// --- Timeouts ---
@Test
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java
index 4e43aac3383..51a457cc537 100644
---
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java
@@ -30,6 +30,7 @@ import
software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import java.lang.reflect.Field;
+import java.time.Duration;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@@ -203,6 +204,73 @@ class S3ClientProviderTest {
.hasMessageContaining("no valid provider class names");
}
+ @Test
+ void testRetryBuilderDefaultsMatchConfigOptions() {
+ S3ClientProvider provider =
+
S3ClientProvider.builder().endpoint(DUMMY_ENDPOINT).region(DUMMY_REGION).build();
+
+ assertThat(provider.getRetryBaseDelay())
+
.isEqualTo(NativeS3FileSystemFactory.RETRY_BASE_DELAY.defaultValue());
+ assertThat(provider.getRetryThrottleBaseDelay())
+
.isEqualTo(NativeS3FileSystemFactory.RETRY_THROTTLE_BASE_DELAY.defaultValue());
+ assertThat(provider.getRetryMaxBackoff())
+
.isEqualTo(NativeS3FileSystemFactory.RETRY_MAX_BACKOFF.defaultValue());
+ }
+
+ @Test
+ void testNegativeRetryBaseDelayThrows() {
+ assertThatThrownBy(() ->
S3ClientProvider.builder().retryBaseDelay(Duration.ofMillis(-1)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("retryBaseDelay must not be negative");
+ }
+
+ @Test
+ void testNegativeRetryThrottleBaseDelayThrows() {
+ assertThatThrownBy(
+ () ->
+ S3ClientProvider.builder()
+
.retryThrottleBaseDelay(Duration.ofMillis(-1)))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("retryThrottleBaseDelay must not be
negative");
+ }
+
+ @Test
+ void testZeroRetryMaxBackoffThrows() {
+ assertThatThrownBy(() ->
S3ClientProvider.builder().retryMaxBackoff(Duration.ZERO))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("retryMaxBackoff must be positive");
+ }
+
+ @Test
+ void testRetryMaxBackoffSmallerThanBaseDelayThrows() {
+ assertThatThrownBy(
+ () ->
+ S3ClientProvider.builder()
+ .endpoint(DUMMY_ENDPOINT)
+ .region(DUMMY_REGION)
+ .retryBaseDelay(Duration.ofSeconds(5))
+ .retryMaxBackoff(Duration.ofSeconds(1))
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("retryMaxBackoff")
+ .hasMessageContaining("retryBaseDelay");
+ }
+
+ @Test
+ void testRetryMaxBackoffSmallerThanThrottleBaseDelayThrows() {
+ assertThatThrownBy(
+ () ->
+ S3ClientProvider.builder()
+ .endpoint(DUMMY_ENDPOINT)
+ .region(DUMMY_REGION)
+
.retryThrottleBaseDelay(Duration.ofSeconds(5))
+ .retryMaxBackoff(Duration.ofSeconds(1))
+ .build())
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("retryMaxBackoff")
+ .hasMessageContaining("retryThrottleBaseDelay");
+ }
+
@SuppressWarnings("unchecked")
private static List<AwsCredentialsProvider>
extractChain(AwsCredentialsProvider provider)
throws Exception {