This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new cbebd3fbea7 [FLINK-39482][filesystem] Support configurable 
maxConnections in S3ClientProvider
cbebd3fbea7 is described below

commit cbebd3fbea71908fa6601b4913b366dad950fd37
Author: Samrat <[email protected]>
AuthorDate: Wed Apr 22 18:06:21 2026 +0530

    [FLINK-39482][filesystem] Support configurable maxConnections in 
S3ClientProvider
---
 .../flink/fs/s3native/NativeS3BulkCopyHelper.java  | 101 +++++++++++++++++++--
 .../flink/fs/s3native/NativeS3FileSystem.java      |   6 ++
 .../fs/s3native/NativeS3FileSystemFactory.java     |  26 +++++-
 .../apache/flink/fs/s3native/S3ClientProvider.java |   4 +-
 .../fs/s3native/NativeS3BulkCopyHelperTest.java    |  53 ++++++++++-
 .../fs/s3native/NativeS3FileSystemFactoryTest.java |  84 +++++++++++++++++
 6 files changed, 261 insertions(+), 13 deletions(-)

diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
index 2dc8fb10d19..5fde18815d3 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
@@ -19,6 +19,7 @@
 package org.apache.flink.fs.s3native;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.ICloseableRegistry;
 import org.apache.flink.core.fs.PathsCopyingFileSystem;
 
@@ -37,15 +38,18 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Helper class for performing bulk S3 to local file system copies using 
S3TransferManager.
  *
  * <p><b>Concurrency Model:</b> Uses batch-based concurrency control with 
{@code
- * maxConcurrentCopies} to limit parallel downloads. The current 
implementation waits for each batch
- * to complete before starting the next batch. A future enhancement could use 
a bounded thread pool
- * (e.g., {@link java.util.concurrent.Semaphore} or bounded executor) to allow 
continuous submission
- * of new downloads as slots become available, which would provide better 
throughput by avoiding the
- * "slowest task in batch" bottleneck.
+ * maxConcurrentCopies} to limit parallel downloads. The effective concurrency 
is clamped to the
+ * HTTP connection pool size ({@code maxConnections}) to prevent connection 
pool exhaustion. The
+ * current implementation waits for each batch to complete before starting the 
next batch. A future
+ * enhancement could use a bounded thread pool (e.g., {@link 
java.util.concurrent.Semaphore} or
+ * bounded executor) to allow continuous submission of new downloads as slots 
become available,
+ * which would provide better throughput by avoiding the "slowest task in 
batch" bottleneck.
  *
  * <p><b>Retry Handling:</b> Relies on the S3TransferManager's built-in retry 
mechanism for
  * transient failures. If a download fails after retries:
@@ -70,10 +74,39 @@ class NativeS3BulkCopyHelper {
 
     private final S3TransferManager transferManager;
     private final int maxConcurrentCopies;
+    private final int maxConnections;
 
-    public NativeS3BulkCopyHelper(S3TransferManager transferManager, int 
maxConcurrentCopies) {
+    /**
+     * Creates a new bulk copy helper.
+     *
+     * @param transferManager the S3 transfer manager for async downloads
+     * @param maxConcurrentCopies the requested maximum number of concurrent 
copy operations
+     * @param maxConnections the HTTP connection pool size; if {@code 
maxConcurrentCopies} exceeds
+     *     this value, it is clamped down to prevent connection pool exhaustion
+     */
+    NativeS3BulkCopyHelper(
+            S3TransferManager transferManager, int maxConcurrentCopies, int 
maxConnections) {
+        checkArgument(maxConcurrentCopies > 0, "maxConcurrentCopies must be 
positive");
+        checkArgument(maxConnections > 0, "maxConnections must be positive");
         this.transferManager = transferManager;
-        this.maxConcurrentCopies = maxConcurrentCopies;
+        this.maxConnections = maxConnections;
+        if (maxConcurrentCopies > maxConnections) {
+            LOG.warn(
+                    "{} ({}) exceeds {} ({}). "
+                            + "Clamping concurrent copies to {} to prevent 
connection pool exhaustion.",
+                    NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(),
+                    maxConcurrentCopies,
+                    NativeS3FileSystemFactory.MAX_CONNECTIONS.key(),
+                    maxConnections,
+                    maxConnections);
+            this.maxConcurrentCopies = maxConnections;
+        } else {
+            this.maxConcurrentCopies = maxConcurrentCopies;
+        }
+    }
+
+    int getMaxConcurrentCopies() {
+        return maxConcurrentCopies;
     }
 
     /**
@@ -97,9 +130,17 @@ class NativeS3BulkCopyHelper {
             return;
         }
 
-        LOG.info("Starting bulk copy of {} files using S3TransferManager", 
requests.size());
+        int totalFiles = requests.size();
+        int totalBatches = (totalFiles + maxConcurrentCopies - 1) / 
maxConcurrentCopies;
+        LOG.info(
+                "Starting bulk copy of {} files using S3TransferManager "
+                        + "(batch size: {}, total batches: {})",
+                totalFiles,
+                maxConcurrentCopies,
+                totalBatches);
 
         List<CompletableFuture<CompletedCopy>> copyFutures = new ArrayList<>();
+        int batchNumber = 0;
 
         try {
             for (int i = 0; i < requests.size(); i++) {
@@ -113,12 +154,18 @@ class NativeS3BulkCopyHelper {
                 }
 
                 if (copyFutures.size() >= maxConcurrentCopies || i == 
requests.size() - 1) {
+                    batchNumber++;
+                    LOG.debug(
+                            "Waiting for batch {}/{} ({} files)",
+                            batchNumber,
+                            totalBatches,
+                            copyFutures.size());
                     waitForCopies(copyFutures);
                     copyFutures.clear();
                 }
             }
 
-            LOG.info("Completed bulk copy of {} files", requests.size());
+            LOG.info("Completed bulk copy of {} files", totalFiles);
         } catch (Exception e) {
             if (!copyFutures.isEmpty()) {
                 LOG.warn(
@@ -181,8 +228,42 @@ class NativeS3BulkCopyHelper {
             Thread.currentThread().interrupt();
             throw new IOException("Bulk copy interrupted", e);
         } catch (ExecutionException e) {
-            throw new IOException("Bulk copy failed", e.getCause());
+            Throwable cause = e.getCause();
+            if (isConnectionPoolExhausted(cause)) {
+                throw new IOException(
+                        String.format(
+                                "S3 connection pool exhausted during bulk 
copy. "
+                                        + "The configured connection pool size 
(%d) could not serve "
+                                        + "the concurrent download requests 
(%d). "
+                                        + "Consider reducing '%s' or 
increasing '%s'.",
+                                maxConnections,
+                                maxConcurrentCopies,
+                                
NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(),
+                                
NativeS3FileSystemFactory.MAX_CONNECTIONS.key()),
+                        cause);
+            }
+            throw new IOException("Bulk copy failed", cause);
+        }
+    }
+
+    /**
+     * Checks whether a failure was caused by HTTP connection pool exhaustion.
+     *
+     * <p>Walks the causal chain looking for the SDK's characteristic message 
about connection
+     * acquire timeouts. This detection is deliberately broad (substring match 
on the message) to
+     * remain resilient to minor SDK wording changes across versions.
+     */
+    @VisibleForTesting
+    static boolean isConnectionPoolExhausted(Throwable throwable) {
+        Throwable current = throwable;
+        while (current != null) {
+            String message = current.getMessage();
+            if (message != null && message.contains("Acquire operation took 
longer than")) {
+                return true;
+            }
+            current = current.getCause();
         }
+        return false;
     }
 
     /**
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
index 40e88d8ae5a..64795cc8c76 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
@@ -170,6 +170,12 @@ class NativeS3FileSystem extends FileSystem
         return clientProvider;
     }
 
+    @VisibleForTesting
+    @Nullable
+    NativeS3BulkCopyHelper getBulkCopyHelper() {
+        return bulkCopyHelper;
+    }
+
     @Override
     public URI getUri() {
         return uri;
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 60b151035cb..81d92dc762b 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -122,6 +122,15 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                     .defaultValue(true)
                     .withDescription("Enable bulk copy operations using 
S3TransferManager");
 
+    public static final ConfigOption<Integer> MAX_CONNECTIONS =
+            ConfigOptions.key("s3.connection.max")
+                    .intType()
+                    .defaultValue(50)
+                    .withDescription(
+                            "Maximum number of HTTP connections in the S3 
client connection pool. "
+                                    + "Applies to both the sync client (Apache 
HTTP) and the async client (Netty). "
+                                    + "Must be at least as large as 
's3.bulk-copy.max-concurrent'.");
+
     public static final ConfigOption<Integer> BULK_COPY_MAX_CONCURRENT =
             ConfigOptions.key("s3.bulk-copy.max-concurrent")
                     .intType()
@@ -348,6 +357,13 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                     readBufferSize);
         }
 
+        final int maxConnections = config.get(MAX_CONNECTIONS);
+        Preconditions.checkArgument(
+                maxConnections > 0,
+                "'%s' must be a positive integer, but was %s",
+                MAX_CONNECTIONS.key(),
+                maxConnections);
+
         S3ClientProvider clientProvider =
                 S3ClientProvider.builder()
                         .accessKey(accessKey)
@@ -355,6 +371,7 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                         .region(region)
                         .endpoint(endpoint)
                         .pathStyleAccess(pathStyleAccess)
+                        .maxConnections(maxConnections)
                         .connectionTimeout(config.get(CONNECTION_TIMEOUT))
                         .socketTimeout(config.get(SOCKET_TIMEOUT))
                         
.connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME))
@@ -371,10 +388,17 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
 
         NativeS3BulkCopyHelper bulkCopyHelper = null;
         if (config.get(BULK_COPY_ENABLED)) {
+            final int bulkCopyMaxConcurrent = 
config.get(BULK_COPY_MAX_CONCURRENT);
+            Preconditions.checkArgument(
+                    bulkCopyMaxConcurrent > 0,
+                    "'%s' must be a positive integer, but was %s",
+                    BULK_COPY_MAX_CONCURRENT.key(),
+                    bulkCopyMaxConcurrent);
             bulkCopyHelper =
                     new NativeS3BulkCopyHelper(
                             clientProvider.getTransferManager(),
-                            config.get(BULK_COPY_MAX_CONCURRENT));
+                            bulkCopyMaxConcurrent,
+                            maxConnections);
         }
 
         return new NativeS3FileSystem(
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index cee92d1ce5d..daccd6247cd 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -389,7 +389,9 @@ class S3ClientProvider implements AutoCloseableAsync {
                                                     
NettyNioAsyncHttpClient.builder()
                                                             
.maxConcurrency(maxConnections)
                                                             
.connectionTimeout(connectionTimeout)
-                                                            
.readTimeout(socketTimeout))
+                                                            
.readTimeout(socketTimeout)
+                                                            
.connectionAcquisitionTimeout(
+                                                                    
connectionTimeout))
                                             
.overrideConfiguration(overrideConfig)
                                             .endpointOverride(endpointUri)
                                             .build())
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
index 37b35f88aca..5b244850dca 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
@@ -20,7 +20,14 @@ package org.apache.flink.fs.s3native;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.core.exception.SdkClientException;
+
+import java.util.Collections;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatNullPointerException;
@@ -28,7 +35,9 @@ import static 
org.assertj.core.api.Assertions.assertThatNullPointerException;
 /** Tests for {@link NativeS3BulkCopyHelper}. */
 class NativeS3BulkCopyHelperTest {
 
-    private static final NativeS3BulkCopyHelper helper = new 
NativeS3BulkCopyHelper(null, 1);
+    private static final NativeS3BulkCopyHelper helper = new 
NativeS3BulkCopyHelper(null, 1, 1);
+
+    // --- URI parsing tests ---
 
     @ParameterizedTest
     @CsvSource({
@@ -105,4 +114,46 @@ class NativeS3BulkCopyHelperTest {
         path.append("file.txt");
         assertThat(helper.extractKey("s3://bucket/" + 
path)).isEqualTo(path.toString());
     }
+
+    private static Stream<Arguments> connectionPoolExhaustedCases() {
+        return Stream.of(
+                Arguments.of(
+                        "direct message match",
+                        SdkClientException.builder()
+                                .message(
+                                        "Unable to execute HTTP request: "
+                                                + "Acquire operation took 
longer than the configured maximum time.")
+                                .build(),
+                        true),
+                Arguments.of(
+                        "nested causal chain",
+                        SdkClientException.builder()
+                                .message("Unable to execute HTTP request")
+                                .cause(
+                                        new RuntimeException(
+                                                "channel acquisition failed",
+                                                new TimeoutException(
+                                                        "Acquire operation 
took longer than 10000 milliseconds.")))
+                                .build(),
+                        true),
+                Arguments.of(
+                        "unrelated error",
+                        SdkClientException.builder().message("Access 
Denied").build(),
+                        false),
+                Arguments.of("null message", new RuntimeException((String) 
null), false),
+                Arguments.of("null throwable", null, false));
+    }
+
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("connectionPoolExhaustedCases")
+    void testConnectionPoolExhaustedDetection(
+            String description, Throwable throwable, boolean expected) {
+        
assertThat(NativeS3BulkCopyHelper.isConnectionPoolExhausted(throwable)).isEqualTo(expected);
+    }
+
+    @Test
+    void testEmptyRequestListIsNoOp() throws Exception {
+        NativeS3BulkCopyHelper noOpHelper = new NativeS3BulkCopyHelper(null, 
16, 50);
+        noOpHelper.copyFiles(Collections.emptyList(), null);
+    }
 }
diff --git 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index 31e6814ab1b..f9e57b55858 100644
--- 
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++ 
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -353,6 +353,90 @@ class NativeS3FileSystemFactoryTest {
         }
     }
 
+    @Test
+    void testInvalidMaxConnectionsThrowsException() {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        Configuration config = new Configuration();
+        config.setString("s3.access-key", "test-access-key");
+        config.setString("s3.secret-key", "test-secret-key");
+        config.setString("s3.region", "us-east-1");
+        config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0);
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+        factory.configure(config);
+
+        URI fsUri = URI.create("s3://test-bucket/");
+        assertThatThrownBy(() -> factory.create(fsUri))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("s3.connection.max")
+                .hasMessageContaining("must be a positive integer");
+    }
+
+    @Test
+    void testInvalidBulkCopyMaxConcurrentThrowsException() {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        Configuration config = new Configuration();
+        config.setString("s3.access-key", "test-access-key");
+        config.setString("s3.secret-key", "test-secret-key");
+        config.setString("s3.region", "us-east-1");
+        config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0);
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+        factory.configure(config);
+
+        URI fsUri = URI.create("s3://test-bucket/");
+        assertThatThrownBy(() -> factory.create(fsUri))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("s3.bulk-copy.max-concurrent")
+                .hasMessageContaining("must be a positive integer");
+    }
+
+    @Test
+    void testBulkCopyMaxConcurrentClampedToMaxConnections() throws Exception {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        Configuration config = new Configuration();
+        config.setString("s3.access-key", "test-access-key");
+        config.setString("s3.secret-key", "test-secret-key");
+        config.setString("s3.region", "us-east-1");
+        config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
+        config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32);
+        config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 10);
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+        factory.configure(config);
+
+        URI fsUri = URI.create("s3://test-bucket/");
+        FileSystem fs = factory.create(fsUri);
+
+        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+        NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+        assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
+        
assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+    }
+
+    @Test
+    void testBulkCopyMaxConcurrentPreservedWithinMaxConnections() throws 
Exception {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        Configuration config = new Configuration();
+        config.setString("s3.access-key", "test-access-key");
+        config.setString("s3.secret-key", "test-secret-key");
+        config.setString("s3.region", "us-east-1");
+        config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
+        config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 10);
+        config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 50);
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+        factory.configure(config);
+
+        URI fsUri = URI.create("s3://test-bucket/");
+        FileSystem fs = factory.create(fsUri);
+
+        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+        NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+        assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
+        
assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+    }
+
     @Test
     void testS3ASchemeReturnsS3A() {
         NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();

Reply via email to