This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 427a2c93fb7 [FLINK-39114][filesystem] Support configurable timeout in 
native-s3-fs
427a2c93fb7 is described below

commit 427a2c93fb785cca7f540d48421551d49292b671
Author: Samrat002 <[email protected]>
AuthorDate: Wed Apr 8 18:28:31 2026 +0530

    [FLINK-39114][filesystem] Support configurable timeout in native-s3-fs
---
 .../flink/fs/s3native/NativeS3FileSystem.java      | 28 +++++---
 .../fs/s3native/NativeS3FileSystemFactory.java     | 45 ++++++++++++-
 .../apache/flink/fs/s3native/S3ClientProvider.java | 68 ++++++++++++++++----
 .../fs/s3native/NativeS3FileSystemFactoryTest.java | 75 ++++++++++++++++++++++
 4 files changed, 196 insertions(+), 20 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
index 81e5075ba44..40e88d8ae5a 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.fs.s3native;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.EntropyInjectingFileSystem;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -51,6 +52,7 @@ import javax.annotation.Nullable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -73,7 +75,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  *       (the underlying clients have not been torn down yet) or fails with an 
{@link IOException}
  *       from the AWS SDK, which callers are already expected to handle.
  *   <li>The {@link #closeAsync()} teardown sequence (bulkCopyHelper, 
transferManager, asyncClient,
- *       syncClient) provides a natural grace period bounded by {@link 
#CLOSE_TIMEOUT_SECONDS}.
+ *       syncClient) provides a natural grace period bounded by the 
configurable close timeout.
  * </ul>
  *
  * <p>A thread-pool-routed approach would provide stricter guarantees but 
introduces latency
@@ -95,9 +97,6 @@ class NativeS3FileSystem extends FileSystem
 
     private static final Logger LOG = 
LoggerFactory.getLogger(NativeS3FileSystem.class);
 
-    /** Timeout in seconds for closing the filesystem. */
-    private static final long CLOSE_TIMEOUT_SECONDS = 60;
-
     private final S3ClientProvider clientProvider;
     private final URI uri;
     private final String bucketName;
@@ -113,6 +112,7 @@ class NativeS3FileSystem extends FileSystem
     @Nullable private final NativeS3BulkCopyHelper bulkCopyHelper;
     private final boolean useAsyncOperations;
     private final int readBufferSize;
+    private final Duration fsCloseTimeout;
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
     public NativeS3FileSystem(
@@ -125,7 +125,8 @@ class NativeS3FileSystem extends FileSystem
             int maxConcurrentUploadsPerStream,
             @Nullable NativeS3BulkCopyHelper bulkCopyHelper,
             boolean useAsyncOperations,
-            int readBufferSize) {
+            int readBufferSize,
+            Duration fsCloseTimeout) {
         this.clientProvider = clientProvider;
         this.uri = uri;
         this.bucketName = uri.getHost();
@@ -136,6 +137,7 @@ class NativeS3FileSystem extends FileSystem
         this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream;
         this.useAsyncOperations = useAsyncOperations;
         this.readBufferSize = readBufferSize;
+        this.fsCloseTimeout = fsCloseTimeout;
         this.s3AccessHelper =
                 new NativeS3AccessHelper(
                         clientProvider.getS3Client(),
@@ -158,6 +160,16 @@ class NativeS3FileSystem extends FileSystem
                 readBufferSize / 1024);
     }
 
+    @VisibleForTesting
+    Duration getFsCloseTimeout() {
+        return fsCloseTimeout;
+    }
+
+    @VisibleForTesting
+    S3ClientProvider getClientProvider() {
+        return clientProvider;
+    }
+
     @Override
     public URI getUri() {
         return uri;
@@ -550,13 +562,13 @@ class NativeS3FileSystem extends FileSystem
                                     }
                                     return 
CompletableFuture.completedFuture(null);
                                 })
-                        .orTimeout(CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+                        .orTimeout(fsCloseTimeout.toSeconds(), 
TimeUnit.SECONDS)
                         .whenComplete(
                                 (result, error) -> {
                                     if (error != null) {
                                         LOG.error(
-                                                "FileSystem close timed out 
after {} seconds for bucket: {}",
-                                                CLOSE_TIMEOUT_SECONDS,
+                                                "FileSystem close timed out 
after {} for bucket: {}",
+                                                fsCloseTimeout,
                                                 bucketName,
                                                 error);
                                     }
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 756d742a405..60b151035cb 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
+import java.time.Duration;
 
 /**
  * Factory for creating Native S3 FileSystem instances.
@@ -203,6 +204,43 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                                     + "Uses the AWS SDK's default retry 
strategy (exponential backoff with jitter). "
                                     + "Set to 0 to disable retries.");
 
+    public static final ConfigOption<Duration> CONNECTION_TIMEOUT =
+            ConfigOptions.key("s3.connection.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(60))
+                    .withDescription(
+                            "HTTP connection timeout for the S3 client. "
+                                    + "Controls how long the client waits to 
establish a connection.");
+
+    public static final ConfigOption<Duration> SOCKET_TIMEOUT =
+            ConfigOptions.key("s3.socket.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(60))
+                    .withDescription(
+                            "HTTP socket timeout for the S3 client. "
+                                    + "Controls how long the client waits for 
data after connection is established.");
+
+    public static final ConfigOption<Duration> CONNECTION_MAX_IDLE_TIME =
+            ConfigOptions.key("s3.connection.max-idle-time")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(60))
+                    .withDescription(
+                            "Maximum idle time for HTTP connections in the 
connection pool.");
+
+    public static final ConfigOption<Duration> FS_CLOSE_TIMEOUT =
+            ConfigOptions.key("s3.close.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(60))
+                    .withDescription(
+                            "Timeout for closing the S3 filesystem. "
+                                    + "Controls how long the filesystem waits 
for pending operations to complete during shutdown.");
+
+    public static final ConfigOption<Duration> CLIENT_CLOSE_TIMEOUT =
+            ConfigOptions.key("s3.client.close.timeout")
+                    .durationType()
+                    .defaultValue(Duration.ofSeconds(30))
+                    .withDescription("Timeout for closing the S3 client and 
releasing resources.");
+
     public static final ConfigOption<String> AWS_CREDENTIALS_PROVIDER =
             ConfigOptions.key("fs.s3.aws.credentials.provider")
                     .stringType()
@@ -317,6 +355,10 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                         .region(region)
                         .endpoint(endpoint)
                         .pathStyleAccess(pathStyleAccess)
+                        .connectionTimeout(config.get(CONNECTION_TIMEOUT))
+                        .socketTimeout(config.get(SOCKET_TIMEOUT))
+                        
.connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME))
+                        .clientCloseTimeout(config.get(CLIENT_CLOSE_TIMEOUT))
                         .assumeRoleArn(config.get(ASSUME_ROLE_ARN))
                         
.assumeRoleExternalId(config.get(ASSUME_ROLE_EXTERNAL_ID))
                         
.assumeRoleSessionName(config.get(ASSUME_ROLE_SESSION_NAME))
@@ -345,6 +387,7 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                 maxConcurrentUploads,
                 bulkCopyHelper,
                 useAsyncOperations,
-                readBufferSize);
+                readBufferSize,
+                config.get(FS_CLOSE_TIMEOUT));
     }
 }
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index cad32d75b12..cee92d1ce5d 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -71,14 +71,15 @@ class S3ClientProvider implements AutoCloseableAsync {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(S3ClientProvider.class);
 
-    /** Timeout in seconds for closing S3 clients. */
-    private static final long CLIENT_CLOSE_TIMEOUT_SECONDS = 30;
-
     private final S3Client s3Client;
     private final S3TransferManager transferManager;
     private final S3EncryptionConfig encryptionConfig;
     @Nullable private final AwsCredentialsProvider credentialsProvider;
     @Nullable private final StsClient stsClient;
+    private final Duration clientCloseTimeout;
+    private final Duration connectionTimeout;
+    private final Duration socketTimeout;
+    private final Duration connectionMaxIdleTime;
     private final AtomicBoolean closed = new AtomicBoolean(false);
 
     private S3ClientProvider(
@@ -86,13 +87,21 @@ class S3ClientProvider implements AutoCloseableAsync {
             S3TransferManager transferManager,
             S3EncryptionConfig encryptionConfig,
             @Nullable AwsCredentialsProvider credentialsProvider,
-            @Nullable StsClient stsClient) {
+            @Nullable StsClient stsClient,
+            Duration clientCloseTimeout,
+            Duration connectionTimeout,
+            Duration socketTimeout,
+            Duration connectionMaxIdleTime) {
         this.s3Client = s3Client;
         this.transferManager = transferManager;
         this.encryptionConfig =
                 encryptionConfig != null ? encryptionConfig : 
S3EncryptionConfig.none();
         this.credentialsProvider = credentialsProvider;
         this.stsClient = stsClient;
+        this.clientCloseTimeout = clientCloseTimeout;
+        this.connectionTimeout = connectionTimeout;
+        this.socketTimeout = socketTimeout;
+        this.connectionMaxIdleTime = connectionMaxIdleTime;
     }
 
     public S3Client getS3Client() {
@@ -115,6 +124,26 @@ class S3ClientProvider implements AutoCloseableAsync {
         return credentialsProvider;
     }
 
+    @VisibleForTesting
+    Duration getClientCloseTimeout() {
+        return clientCloseTimeout;
+    }
+
+    @VisibleForTesting
+    Duration getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    @VisibleForTesting
+    Duration getSocketTimeout() {
+        return socketTimeout;
+    }
+
+    @VisibleForTesting
+    Duration getConnectionMaxIdleTime() {
+        return connectionMaxIdleTime;
+    }
+
     @Override
     public CompletableFuture<Void> closeAsync() {
         if (!closed.compareAndSet(false, true)) {
@@ -151,13 +180,10 @@ class S3ClientProvider implements AutoCloseableAsync {
                                 }
                             }
                         })
-                .orTimeout(CLIENT_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
+                .orTimeout(clientCloseTimeout.toSeconds(), TimeUnit.SECONDS)
                 .exceptionally(
                         ex -> {
-                            LOG.error(
-                                    "S3 client close timed out after {} 
seconds",
-                                    CLIENT_CLOSE_TIMEOUT_SECONDS,
-                                    ex);
+                            LOG.error("S3 client close timed out after {}", 
clientCloseTimeout, ex);
                             return null;
                         });
     }
@@ -181,8 +207,10 @@ class S3ClientProvider implements AutoCloseableAsync {
         private int maxConnections = 50;
         private Duration connectionTimeout = Duration.ofSeconds(60);
         private Duration socketTimeout = Duration.ofSeconds(60);
+        private Duration connectionMaxIdleTime = Duration.ofSeconds(60);
         private boolean disableCertCheck = false;
         private int maxRetries = 3;
+        private Duration clientCloseTimeout = Duration.ofSeconds(30);
 
         // AssumeRole configuration
         private String assumeRoleArn;
@@ -236,6 +264,16 @@ class S3ClientProvider implements AutoCloseableAsync {
             return this;
         }
 
+        public Builder connectionMaxIdleTime(Duration connectionMaxIdleTime) {
+            this.connectionMaxIdleTime = connectionMaxIdleTime;
+            return this;
+        }
+
+        public Builder clientCloseTimeout(Duration clientCloseTimeout) {
+            this.clientCloseTimeout = clientCloseTimeout;
+            return this;
+        }
+
         public Builder disableCertCheck(boolean disableCertCheck) {
             this.disableCertCheck = disableCertCheck;
             return this;
@@ -327,7 +365,7 @@ class S3ClientProvider implements AutoCloseableAsync {
                             .connectionTimeout(connectionTimeout)
                             .socketTimeout(socketTimeout)
                             .tcpKeepAlive(true)
-                            .connectionMaxIdleTime(Duration.ofSeconds(60));
+                            .connectionMaxIdleTime(connectionMaxIdleTime);
 
             S3ClientBuilder clientBuilder =
                     S3Client.builder()
@@ -358,7 +396,15 @@ class S3ClientProvider implements AutoCloseableAsync {
                             .build();
 
             return new S3ClientProvider(
-                    s3Client, transferManager, encryptionConfig, 
credentialsProvider, stsClient);
+                    s3Client,
+                    transferManager,
+                    encryptionConfig,
+                    credentialsProvider,
+                    stsClient,
+                    clientCloseTimeout,
+                    connectionTimeout,
+                    socketTimeout,
+                    connectionMaxIdleTime);
         }
 
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index ad0c0b7d0ef..31e6814ab1b 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.junit.jupiter.api.Test;
 
 import java.net.URI;
+import java.time.Duration;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -435,4 +436,78 @@ class NativeS3FileSystemFactoryTest {
         assertThat(fs).isNotNull();
         assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
     }
+
+    @Test
+    void testCustomTimeoutConfiguration() throws Exception {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        Configuration config = new Configuration();
+        config.setString("s3.access-key", "test-access-key");
+        config.setString("s3.secret-key", "test-secret-key");
+        config.setString("s3.region", "us-east-1");
+        config.set(NativeS3FileSystemFactory.CONNECTION_TIMEOUT, 
Duration.ofSeconds(30));
+        config.set(NativeS3FileSystemFactory.SOCKET_TIMEOUT, 
Duration.ofSeconds(45));
+        config.set(NativeS3FileSystemFactory.CONNECTION_MAX_IDLE_TIME, 
Duration.ofMinutes(2));
+        config.set(NativeS3FileSystemFactory.FS_CLOSE_TIMEOUT, 
Duration.ofSeconds(90));
+        config.set(NativeS3FileSystemFactory.CLIENT_CLOSE_TIMEOUT, 
Duration.ofSeconds(15));
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+        factory.configure(config);
+
+        URI fsUri = URI.create("s3://test-bucket/");
+        FileSystem fs = factory.create(fsUri);
+
+        assertThat(fs).isNotNull();
+        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+        NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+        
assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofSeconds(90));
+
+        S3ClientProvider clientProvider = nativeFs.getClientProvider();
+        
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
+        
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofSeconds(45));
+        
assertThat(clientProvider.getConnectionMaxIdleTime()).isEqualTo(Duration.ofMinutes(2));
+        
assertThat(clientProvider.getClientCloseTimeout()).isEqualTo(Duration.ofSeconds(15));
+    }
+
+    @Test
+    void testTimeoutConfigurationWithStringDuration() throws Exception {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        Configuration config = new Configuration();
+        config.setString("s3.access-key", "test-access-key");
+        config.setString("s3.secret-key", "test-secret-key");
+        config.setString("s3.region", "us-east-1");
+        config.setString("s3.connection.timeout", "30 s");
+        config.setString("s3.socket.timeout", "2 min");
+        config.setString("s3.close.timeout", "1 min");
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+        factory.configure(config);
+
+        URI fsUri = URI.create("s3://test-bucket/");
+        FileSystem fs = factory.create(fsUri);
+
+        assertThat(fs).isNotNull();
+        NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+        
assertThat(nativeFs.getFsCloseTimeout()).isEqualTo(Duration.ofMinutes(1));
+
+        S3ClientProvider clientProvider = nativeFs.getClientProvider();
+        
assertThat(clientProvider.getConnectionTimeout()).isEqualTo(Duration.ofSeconds(30));
+        
assertThat(clientProvider.getSocketTimeout()).isEqualTo(Duration.ofMinutes(2));
+    }
+
+    @Test
+    void testInvalidTimeoutConfigurationThrowsException() {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        Configuration config = new Configuration();
+        config.setString("s3.access-key", "test-access-key");
+        config.setString("s3.secret-key", "test-secret-key");
+        config.setString("s3.region", "us-east-1");
+        config.setString("s3.connection.timeout", "not-a-duration");
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+        factory.configure(config);
+
+        URI fsUri = URI.create("s3://test-bucket/");
+        assertThatThrownBy(() -> factory.create(fsUri))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
 }

Reply via email to