This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 427a2c93fb7 [FLINK-39114][filesystem] Support configurable timeout in
native-s3-fs
427a2c93fb7 is described below
commit 427a2c93fb785cca7f540d48421551d49292b671
Author: Samrat002 <[email protected]>
AuthorDate: Wed Apr 8 18:28:31 2026 +0530
[FLINK-39114][filesystem] Support configurable timeout in native-s3-fs
---
.../flink/fs/s3native/NativeS3FileSystem.java | 28 +++++---
.../fs/s3native/NativeS3FileSystemFactory.java | 45 ++++++++++++-
.../apache/flink/fs/s3native/S3ClientProvider.java | 68 ++++++++++++++++----
.../fs/s3native/NativeS3FileSystemFactoryTest.java | 75 ++++++++++++++++++++++
4 files changed, 196 insertions(+), 20 deletions(-)
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
index 81e5075ba44..40e88d8ae5a 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
@@ -18,6 +18,7 @@
package org.apache.flink.fs.s3native;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.EntropyInjectingFileSystem;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -51,6 +52,7 @@ import javax.annotation.Nullable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -73,7 +75,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* (the underlying clients have not been torn down yet) or fails with an
{@link IOException}
* from the AWS SDK, which callers are already expected to handle.
* <li>The {@link #closeAsync()} teardown sequence (bulkCopyHelper,
transferManager, asyncClient,
- * syncClient) provides a natural grace period bounded by {@link
#CLOSE_TIMEOUT_SECONDS}.
+ * syncClient) provides a natural grace period bounded by the
configurable close timeout.
* </ul>
*
* <p>A thread-pool-routed approach would provide stricter guarantees but
introduces latency
@@ -95,9 +97,6 @@ class NativeS3FileSystem extends FileSystem
private static final Logger LOG =
LoggerFactory.getLogger(NativeS3FileSystem.class);
- /** Timeout in seconds for closing the filesystem. */
- private static final long CLOSE_TIMEOUT_SECONDS = 60;
-
private final S3ClientProvider clientProvider;
private final URI uri;
private final String bucketName;
@@ -113,6 +112,7 @@ class NativeS3FileSystem extends FileSystem
@Nullable private final NativeS3BulkCopyHelper bulkCopyHelper;
private final boolean useAsyncOperations;
private final int readBufferSize;
+ private final Duration fsCloseTimeout;
private final AtomicBoolean closed = new AtomicBoolean(false);
public NativeS3FileSystem(
@@ -125,7 +125,8 @@ class NativeS3FileSystem extends FileSystem
int maxConcurrentUploadsPerStream,
@Nullable NativeS3BulkCopyHelper bulkCopyHelper,
boolean useAsyncOperations,
- int readBufferSize) {
+ int readBufferSize,
+ Duration fsCloseTimeout) {
this.clientProvider = clientProvider;
this.uri = uri;
this.bucketName = uri.getHost();
@@ -136,6 +137,7 @@ class NativeS3FileSystem extends FileSystem
this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
this.useAsyncOperations = useAsyncOperations;
this.readBufferSize = readBufferSize;
+ this.fsCloseTimeout = fsCloseTimeout;
this.s3AccessHelper =
new NativeS3AccessHelper(
clientProvider.getS3Client(),
@@ -158,6 +160,16 @@ class NativeS3FileSystem extends FileSystem
readBufferSize / 1024);
}
+ @VisibleForTesting
+ Duration getFsCloseTimeout() {
+ return fsCloseTimeout;
+ }
+
+ @VisibleForTesting
+ S3ClientProvider getClientProvider() {
+ return clientProvider;
+ }
+
@Override
public URI getUri() {
return uri;
@@ -550,13 +562,13 @@ class NativeS3FileSystem extends FileSystem
}
return
CompletableFuture.completedFuture(null);
})
- .orTimeout(CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .orTimeout(fsCloseTimeout.toSeconds(),
TimeUnit.SECONDS)
.whenComplete(
(result, error) -> {
if (error != null) {
LOG.error(
- "FileSystem close timed out
after {} seconds for bucket: {}",
- CLOSE_TIMEOUT_SECONDS,
+ "FileSystem close timed out
after {} for bucket: {}",
+ fsCloseTimeout,
bucketName,
error);
}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 756d742a405..60b151035cb 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
+import java.time.Duration;
/**
* Factory for creating Native S3 FileSystem instances.
@@ -203,6 +204,43 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
+ "Uses the AWS SDK's default retry
strategy (exponential backoff with jitter). "
+ "Set to 0 to disable retries.");
+ public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
+ ConfigOptions.key("s3.connection.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDescription(
+ "HTTP connection timeout for the S3 client. "
+ + "Controls how long the client waits to
establish a connection.");
+
+ public static final ConfigOption<Duration> SOCKET_TIMEOUT =
+ ConfigOptions.key("s3.socket.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDescription(
+ "HTTP socket timeout for the S3 client. "
+ + "Controls how long the client waits for
data after connection is established.");
+
+ public static final ConfigOption<Duration> CONNECTION_MAX_IDLE_TIME =
+ ConfigOptions.key("s3.connection.max-idle-time")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDescription(
+ "Maximum idle time for HTTP connections in the
connection pool.");
+
+ public static final ConfigOption<Duration> FS_CLOSE_TIMEOUT =
+ ConfigOptions.key("s3.close.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(60))
+ .withDescription(
+ "Timeout for closing the S3 filesystem. "
+ + "Controls how long the filesystem waits
for pending operations to complete during shutdown.");
+
+ public static final ConfigOption<Duration> CLIENT_CLOSE_TIMEOUT =
+ ConfigOptions.key("s3.client.close.timeout")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(30))
+ .withDescription("Timeout for closing the S3 client and
releasing resources.");
+
public static final ConfigOption<String> AWS_CREDENTIALS_PROVIDER =
ConfigOptions.key("fs.s3.aws.credentials.provider")
.stringType()
@@ -317,6 +355,10 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
.region(region)
.endpoint(endpoint)
.pathStyleAccess(pathStyleAccess)
+ .connectionTimeout(config.get(CONNECTION_TIMEOUT))
+ .socketTimeout(config.get(SOCKET_TIMEOUT))
+
.connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME))
+ .clientCloseTimeout(config.get(CLIENT_CLOSE_TIMEOUT))
.assumeRoleArn(config.get(ASSUME_ROLE_ARN))
.assumeRoleExternalId(config.get(ASSUME_ROLE_EXTERNAL_ID))
.assumeRoleSessionName(config.get(ASSUME_ROLE_SESSION_NAME))
@@ -345,6 +387,7 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
maxConcurrentUploads,
bulkCopyHelper,
useAsyncOperations,
- readBufferSize);
+ readBufferSize,
+ config.get(FS_CLOSE_TIMEOUT));
}
}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index cad32d75b12..cee92d1ce5d 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -71,14 +71,15 @@ class S3ClientProvider implements AutoCloseableAsync {
private static final Logger LOG =
LoggerFactory.getLogger(S3ClientProvider.class);
- /** Timeout in seconds for closing S3 clients. */
- private static final long CLIENT_CLOSE_TIMEOUT_SECONDS = 30;
-
private final S3Client s3Client;
private final S3TransferManager transferManager;
private final S3EncryptionConfig encryptionConfig;
@Nullable private final AwsCredentialsProvider credentialsProvider;
@Nullable private final StsClient stsClient;
+ private final Duration clientCloseTimeout;
+ private final Duration connectionTimeout;
+ private final Duration socketTimeout;
+ private final Duration connectionMaxIdleTime;
private final AtomicBoolean closed = new AtomicBoolean(false);
private S3ClientProvider(
@@ -86,13 +87,21 @@ class S3ClientProvider implements AutoCloseableAsync {
S3TransferManager transferManager,
S3EncryptionConfig encryptionConfig,
@Nullable AwsCredentialsProvider credentialsProvider,
- @Nullable StsClient stsClient) {
+ @Nullable StsClient stsClient,
+ Duration clientCloseTimeout,
+ Duration connectionTimeout,
+ Duration socketTimeout,
+ Duration connectionMaxIdleTime) {
this.s3Client = s3Client;
this.transferManager = transferManager;
this.encryptionConfig =
encryptionConfig != null ? encryptionConfig :
S3EncryptionConfig.none();
this.credentialsProvider = credentialsProvider;
this.stsClient = stsClient;
+ this.clientCloseTimeout = clientCloseTimeout;
+ this.connectionTimeout = connectionTimeout;
+ this.socketTimeout = socketTimeout;
+ this.connectionMaxIdleTime = connectionMaxIdleTime;
}
public S3Client getS3Client() {
@@ -115,6 +124,26 @@ class S3ClientProvider implements AutoCloseableAsync {
return credentialsProvider;
}
+ @VisibleForTesting
+ Duration getClientCloseTimeout() {
+ return clientCloseTimeout;
+ }
+
+ @VisibleForTesting
+ Duration getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ @VisibleForTesting
+ Duration getSocketTimeout() {
+ return socketTimeout;
+ }
+
+ @VisibleForTesting
+ Duration getConnectionMaxIdleTime() {
+ return connectionMaxIdleTime;
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
if (!closed.compareAndSet(false, true)) {
@@ -151,13 +180,10 @@ class S3ClientProvider implements AutoCloseableAsync {
}
}
})
- .orTimeout(CLIENT_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+ .orTimeout(clientCloseTimeout.toSeconds(), TimeUnit.SECONDS)
.exceptionally(
ex -> {
- LOG.error(
- "S3 client close timed out after {}
seconds",
- CLIENT_CLOSE_TIMEOUT_SECONDS,
- ex);
+ LOG.error("S3 client close timed out after {}",
clientCloseTimeout, ex);
return null;
});
}
@@ -181,8 +207,10 @@ class S3ClientProvider implements AutoCloseableAsync {
private int maxConnections = 50;
private Duration connectionTimeout = Duration.ofSeconds(60);
private Duration socketTimeout = Duration.ofSeconds(60);
+ private Duration connectionMaxIdleTime = Duration.ofSeconds(60);
private boolean disableCertCheck = false;
private int maxRetries = 3;
+ private Duration clientCloseTimeout = Duration.ofSeconds(30);
// AssumeRole configuration
private String assumeRoleArn;
@@ -236,6 +264,16 @@ class S3ClientProvider implements AutoCloseableAsync {
return this;
}
+ public Builder connectionMaxIdleTime(Duration connectionMaxIdleTime) {
+ this.connectionMaxIdleTime = connectionMaxIdleTime;
+ return this;
+ }
+
+ public Builder clientCloseTimeout(Duration clientCloseTimeout) {
+ this.clientCloseTimeout = clientCloseTimeout;
+ return this;
+ }
+
public Builder disableCertCheck(boolean disableCertCheck) {
this.disableCertCheck = disableCertCheck;
return this;
@@ -327,7 +365,7 @@ class S3ClientProvider implements AutoCloseableAsync {
.connectionTimeout(connectionTimeout)
.socketTimeout(socketTimeout)
.tcpKeepAlive(true)
- .connectionMaxIdleTime(Duration.ofSeconds(60));
+ .connectionMaxIdleTime(connectionMaxIdleTime);
S3ClientBuilder clientBuilder =
S3Client.builder()
@@ -358,7 +396,15 @@ class S3ClientProvider implements AutoCloseableAsync {
.build();
return new S3ClientProvider(
- s3Client, transferManager, encryptionConfig,
credentialsProvider, stsClient);
+ s3Client,
+ transferManager,
+ encryptionConfig,
+ credentialsProvider,
+ stsClient,
+ clientCloseTimeout,
+ connectionTimeout,
+ socketTimeout,
+ connectionMaxIdleTime);
}
private AwsCredentialsProvider buildBaseCredentialsProvider() {
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index ad0c0b7d0ef..31e6814ab1b 100644
---
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FileSystem;
import org.junit.jupiter.api.Test;
import java.net.URI;
+import java.time.Duration;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -435,4 +436,78 @@ class NativeS3FileSystemFactoryTest {
assertThat(fs).isNotNull();
assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
}
+
+ @Test
+ void testCustomTimeoutConfiguration() throws Exception {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.set(NativeS3FileSystemFactory.CONNECTION_TIMEOUT,
Duration.ofSeconds(30));
+ config.set(NativeS3FileSystemFactory.SOCKET_TIMEOUT,
Duration.ofSeconds(45));
+ config.set(NativeS3FileSystemFactory.CONNECTION_MAX_IDLE_TIME,
Duration.ofMinutes(2));
+ config.set(NativeS3FileSystemFactory.FS_CLOSE_TIMEOUT,
Duration.ofSeconds(90));
+ config.set(NativeS3FileSystemFactory.CLIENT_CLOSE_TIMEOUT,
Duration.ofSeconds(15));
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ FileSystem fs = factory.create(fsUri);
+
+ assertThat(fs).isNotNull();
+ assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+
assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofSeconds(90));
+
+ S3ClientProvider clientProvider = nativeFs.getClientProvider();
+
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
+
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofSeconds(45));
+
assertThat(clientProvider.getConnectionMaxIdleTime()).isEqualTo(Duration.ofMinutes(2));
+
assertThat(clientProvider.getClientCloseTimeout()).isEqualTo(Duration.ofSeconds(15));
+ }
+
+ @Test
+ void testTimeoutConfigurationWithStringDuration() throws Exception {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.setString("s3.connection.timeout", "30 s");
+ config.setString("s3.socket.timeout", "2 min");
+ config.setString("s3.close.timeout", "1 min");
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ FileSystem fs = factory.create(fsUri);
+
+ assertThat(fs).isNotNull();
+ NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+
assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofMinutes(1));
+
+ S3ClientProvider clientProvider = nativeFs.getClientProvider();
+
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
+
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofMinutes(2));
+ }
+
+ @Test
+ void testInvalidTimeoutConfigurationThrowsException() {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.setString("s3.connection.timeout", "not-a-duration");
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ assertThatThrownBy(() -> factory.create(fsUri))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
}