This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0e899fdb352 [FLINK-39482][filesystem] Support configurable
maxConnections in S3ClientProvider
0e899fdb352 is described below
commit 0e899fdb352c23afa017e89e07ef574e8e69f801
Author: Samrat <[email protected]>
AuthorDate: Tue Apr 21 15:01:23 2026 +0530
[FLINK-39482][filesystem] Support configurable maxConnections in
S3ClientProvider
---
.../flink/fs/s3native/NativeS3BulkCopyHelper.java | 101 +++++++++++++++++++--
.../flink/fs/s3native/NativeS3FileSystem.java | 6 ++
.../fs/s3native/NativeS3FileSystemFactory.java | 26 +++++-
.../apache/flink/fs/s3native/S3ClientProvider.java | 4 +-
.../fs/s3native/NativeS3BulkCopyHelperTest.java | 53 ++++++++++-
.../fs/s3native/NativeS3FileSystemFactoryTest.java | 84 +++++++++++++++++
6 files changed, 261 insertions(+), 13 deletions(-)
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
index 2dc8fb10d19..5fde18815d3 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelper.java
@@ -19,6 +19,7 @@
package org.apache.flink.fs.s3native;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.PathsCopyingFileSystem;
@@ -37,15 +38,18 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
* Helper class for performing bulk S3 to local file system copies using
S3TransferManager.
*
* <p><b>Concurrency Model:</b> Uses batch-based concurrency control with
{@code
- * maxConcurrentCopies} to limit parallel downloads. The current
implementation waits for each batch
- * to complete before starting the next batch. A future enhancement could use
a bounded thread pool
- * (e.g., {@link java.util.concurrent.Semaphore} or bounded executor) to allow
continuous submission
- * of new downloads as slots become available, which would provide better
throughput by avoiding the
- * "slowest task in batch" bottleneck.
+ * maxConcurrentCopies} to limit parallel downloads. The effective concurrency
is clamped to the
+ * HTTP connection pool size ({@code maxConnections}) to prevent connection
pool exhaustion. The
+ * current implementation waits for each batch to complete before starting the
next batch. A future
+ * enhancement could use a bounded thread pool (e.g., {@link
java.util.concurrent.Semaphore} or
+ * bounded executor) to allow continuous submission of new downloads as slots
become available,
+ * which would provide better throughput by avoiding the "slowest task in
batch" bottleneck.
*
* <p><b>Retry Handling:</b> Relies on the S3TransferManager's built-in retry
mechanism for
* transient failures. If a download fails after retries:
@@ -70,10 +74,39 @@ class NativeS3BulkCopyHelper {
private final S3TransferManager transferManager;
private final int maxConcurrentCopies;
+ private final int maxConnections;
- public NativeS3BulkCopyHelper(S3TransferManager transferManager, int
maxConcurrentCopies) {
+ /**
+ * Creates a new bulk copy helper.
+ *
+ * @param transferManager the S3 transfer manager for async downloads
+ * @param maxConcurrentCopies the requested maximum number of concurrent
copy operations
+ * @param maxConnections the HTTP connection pool size; if {@code
maxConcurrentCopies} exceeds
+ * this value, it is clamped down to prevent connection pool exhaustion
+ */
+ NativeS3BulkCopyHelper(
+ S3TransferManager transferManager, int maxConcurrentCopies, int
maxConnections) {
+ checkArgument(maxConcurrentCopies > 0, "maxConcurrentCopies must be
positive");
+ checkArgument(maxConnections > 0, "maxConnections must be positive");
this.transferManager = transferManager;
- this.maxConcurrentCopies = maxConcurrentCopies;
+ this.maxConnections = maxConnections;
+ if (maxConcurrentCopies > maxConnections) {
+ LOG.warn(
+ "{} ({}) exceeds {} ({}). "
+ + "Clamping concurrent copies to {} to prevent
connection pool exhaustion.",
+ NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(),
+ maxConcurrentCopies,
+ NativeS3FileSystemFactory.MAX_CONNECTIONS.key(),
+ maxConnections,
+ maxConnections);
+ this.maxConcurrentCopies = maxConnections;
+ } else {
+ this.maxConcurrentCopies = maxConcurrentCopies;
+ }
+ }
+
+ int getMaxConcurrentCopies() {
+ return maxConcurrentCopies;
}
/**
@@ -97,9 +130,17 @@ class NativeS3BulkCopyHelper {
return;
}
- LOG.info("Starting bulk copy of {} files using S3TransferManager",
requests.size());
+ int totalFiles = requests.size();
+ int totalBatches = (totalFiles + maxConcurrentCopies - 1) /
maxConcurrentCopies;
+ LOG.info(
+ "Starting bulk copy of {} files using S3TransferManager "
+ + "(batch size: {}, total batches: {})",
+ totalFiles,
+ maxConcurrentCopies,
+ totalBatches);
List<CompletableFuture<CompletedCopy>> copyFutures = new ArrayList<>();
+ int batchNumber = 0;
try {
for (int i = 0; i < requests.size(); i++) {
@@ -113,12 +154,18 @@ class NativeS3BulkCopyHelper {
}
if (copyFutures.size() >= maxConcurrentCopies || i ==
requests.size() - 1) {
+ batchNumber++;
+ LOG.debug(
+ "Waiting for batch {}/{} ({} files)",
+ batchNumber,
+ totalBatches,
+ copyFutures.size());
waitForCopies(copyFutures);
copyFutures.clear();
}
}
- LOG.info("Completed bulk copy of {} files", requests.size());
+ LOG.info("Completed bulk copy of {} files", totalFiles);
} catch (Exception e) {
if (!copyFutures.isEmpty()) {
LOG.warn(
@@ -181,8 +228,42 @@ class NativeS3BulkCopyHelper {
Thread.currentThread().interrupt();
throw new IOException("Bulk copy interrupted", e);
} catch (ExecutionException e) {
- throw new IOException("Bulk copy failed", e.getCause());
+ Throwable cause = e.getCause();
+ if (isConnectionPoolExhausted(cause)) {
+ throw new IOException(
+ String.format(
+ "S3 connection pool exhausted during bulk
copy. "
+ + "The configured connection pool size
(%d) could not serve "
+ + "the concurrent download requests
(%d). "
+ + "Consider reducing '%s' or
increasing '%s'.",
+ maxConnections,
+ maxConcurrentCopies,
+
NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT.key(),
+
NativeS3FileSystemFactory.MAX_CONNECTIONS.key()),
+ cause);
+ }
+ throw new IOException("Bulk copy failed", cause);
+ }
+ }
+
+ /**
+ * Checks whether a failure was caused by HTTP connection pool exhaustion.
+ *
+ * <p>Walks the causal chain looking for the SDK's characteristic message
about connection
+ * acquire timeouts. This detection is deliberately broad (substring match
on the message) to
+ * remain resilient to minor SDK wording changes across versions.
+ */
+ @VisibleForTesting
+ static boolean isConnectionPoolExhausted(Throwable throwable) {
+ Throwable current = throwable;
+ while (current != null) {
+ String message = current.getMessage();
+ if (message != null && message.contains("Acquire operation took
longer than")) {
+ return true;
+ }
+ current = current.getCause();
}
+ return false;
}
/**
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
index 40e88d8ae5a..64795cc8c76 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java
@@ -170,6 +170,12 @@ class NativeS3FileSystem extends FileSystem
return clientProvider;
}
+ @VisibleForTesting
+ @Nullable
+ NativeS3BulkCopyHelper getBulkCopyHelper() {
+ return bulkCopyHelper;
+ }
+
@Override
public URI getUri() {
return uri;
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index 60b151035cb..81d92dc762b 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -122,6 +122,15 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
.defaultValue(true)
.withDescription("Enable bulk copy operations using
S3TransferManager");
+ public static final ConfigOption<Integer> MAX_CONNECTIONS =
+ ConfigOptions.key("s3.connection.max")
+ .intType()
+ .defaultValue(50)
+ .withDescription(
+ "Maximum number of HTTP connections in the S3
client connection pool. "
+ + "Applies to both the sync client (Apache
HTTP) and the async client (Netty). "
+ + "Must be at least as large as
's3.bulk-copy.max-concurrent'.");
+
public static final ConfigOption<Integer> BULK_COPY_MAX_CONCURRENT =
ConfigOptions.key("s3.bulk-copy.max-concurrent")
.intType()
@@ -348,6 +357,13 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
readBufferSize);
}
+ final int maxConnections = config.get(MAX_CONNECTIONS);
+ Preconditions.checkArgument(
+ maxConnections > 0,
+ "'%s' must be a positive integer, but was %s",
+ MAX_CONNECTIONS.key(),
+ maxConnections);
+
S3ClientProvider clientProvider =
S3ClientProvider.builder()
.accessKey(accessKey)
@@ -355,6 +371,7 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
.region(region)
.endpoint(endpoint)
.pathStyleAccess(pathStyleAccess)
+ .maxConnections(maxConnections)
.connectionTimeout(config.get(CONNECTION_TIMEOUT))
.socketTimeout(config.get(SOCKET_TIMEOUT))
.connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME))
@@ -371,10 +388,17 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
NativeS3BulkCopyHelper bulkCopyHelper = null;
if (config.get(BULK_COPY_ENABLED)) {
+ final int bulkCopyMaxConcurrent =
config.get(BULK_COPY_MAX_CONCURRENT);
+ Preconditions.checkArgument(
+ bulkCopyMaxConcurrent > 0,
+ "'%s' must be a positive integer, but was %s",
+ BULK_COPY_MAX_CONCURRENT.key(),
+ bulkCopyMaxConcurrent);
bulkCopyHelper =
new NativeS3BulkCopyHelper(
clientProvider.getTransferManager(),
- config.get(BULK_COPY_MAX_CONCURRENT));
+ bulkCopyMaxConcurrent,
+ maxConnections);
}
return new NativeS3FileSystem(
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index cee92d1ce5d..daccd6247cd 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -389,7 +389,9 @@ class S3ClientProvider implements AutoCloseableAsync {
NettyNioAsyncHttpClient.builder()
.maxConcurrency(maxConnections)
.connectionTimeout(connectionTimeout)
-
.readTimeout(socketTimeout))
+
.readTimeout(socketTimeout)
+
.connectionAcquisitionTimeout(
+
connectionTimeout))
.overrideConfiguration(overrideConfig)
.endpointOverride(endpointUri)
.build())
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
index 37b35f88aca..5b244850dca 100644
---
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3BulkCopyHelperTest.java
@@ -20,7 +20,14 @@ package org.apache.flink.fs.s3native;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import software.amazon.awssdk.core.exception.SdkClientException;
+
+import java.util.Collections;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNullPointerException;
@@ -28,7 +35,9 @@ import static
org.assertj.core.api.Assertions.assertThatNullPointerException;
/** Tests for {@link NativeS3BulkCopyHelper}. */
class NativeS3BulkCopyHelperTest {
- private static final NativeS3BulkCopyHelper helper = new
NativeS3BulkCopyHelper(null, 1);
+ private static final NativeS3BulkCopyHelper helper = new
NativeS3BulkCopyHelper(null, 1, 1);
+
+ // --- URI parsing tests ---
@ParameterizedTest
@CsvSource({
@@ -105,4 +114,46 @@ class NativeS3BulkCopyHelperTest {
path.append("file.txt");
assertThat(helper.extractKey("s3://bucket/" +
path)).isEqualTo(path.toString());
}
+
+ private static Stream<Arguments> connectionPoolExhaustedCases() {
+ return Stream.of(
+ Arguments.of(
+ "direct message match",
+ SdkClientException.builder()
+ .message(
+ "Unable to execute HTTP request: "
+ + "Acquire operation took
longer than the configured maximum time.")
+ .build(),
+ true),
+ Arguments.of(
+ "nested causal chain",
+ SdkClientException.builder()
+ .message("Unable to execute HTTP request")
+ .cause(
+ new RuntimeException(
+ "channel acquisition failed",
+ new TimeoutException(
+ "Acquire operation
took longer than 10000 milliseconds.")))
+ .build(),
+ true),
+ Arguments.of(
+ "unrelated error",
+ SdkClientException.builder().message("Access
Denied").build(),
+ false),
+ Arguments.of("null message", new RuntimeException((String)
null), false),
+ Arguments.of("null throwable", null, false));
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("connectionPoolExhaustedCases")
+ void testConnectionPoolExhaustedDetection(
+ String description, Throwable throwable, boolean expected) {
+
assertThat(NativeS3BulkCopyHelper.isConnectionPoolExhausted(throwable)).isEqualTo(expected);
+ }
+
+ @Test
+ void testEmptyRequestListIsNoOp() throws Exception {
+ NativeS3BulkCopyHelper noOpHelper = new NativeS3BulkCopyHelper(null,
16, 50);
+ noOpHelper.copyFiles(Collections.emptyList(), null);
+ }
}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index 31e6814ab1b..f9e57b55858 100644
---
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -353,6 +353,90 @@ class NativeS3FileSystemFactoryTest {
}
}
+ @Test
+ void testInvalidMaxConnectionsThrowsException() {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 0);
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ assertThatThrownBy(() -> factory.create(fsUri))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("s3.connection.max")
+ .hasMessageContaining("must be a positive integer");
+ }
+
+ @Test
+ void testInvalidBulkCopyMaxConcurrentThrowsException() {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 0);
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ assertThatThrownBy(() -> factory.create(fsUri))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("s3.bulk-copy.max-concurrent")
+ .hasMessageContaining("must be a positive integer");
+ }
+
+ @Test
+ void testBulkCopyMaxConcurrentClampedToMaxConnections() throws Exception {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
+ config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 32);
+ config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 10);
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ FileSystem fs = factory.create(fsUri);
+
+ assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+ assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
+
assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+ }
+
+ @Test
+ void testBulkCopyMaxConcurrentPreservedWithinMaxConnections() throws
Exception {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "test-access-key");
+ config.setString("s3.secret-key", "test-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.set(NativeS3FileSystemFactory.BULK_COPY_ENABLED, true);
+ config.set(NativeS3FileSystemFactory.BULK_COPY_MAX_CONCURRENT, 10);
+ config.set(NativeS3FileSystemFactory.MAX_CONNECTIONS, 50);
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ factory.configure(config);
+
+ URI fsUri = URI.create("s3://test-bucket/");
+ FileSystem fs = factory.create(fsUri);
+
+ assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
+ NativeS3FileSystem nativeFs = (NativeS3FileSystem) fs;
+ assertThat(nativeFs.getBulkCopyHelper()).isNotNull();
+
assertThat(nativeFs.getBulkCopyHelper().getMaxConcurrentCopies()).isEqualTo(10);
+ }
+
@Test
void testS3ASchemeReturnsS3A() {
NativeS3AFileSystemFactory factory = new NativeS3AFileSystemFactory();