gaborgsomogyi commented on code in PR #28136:
URL: https://github.com/apache/flink/pull/28136#discussion_r3259552729


##########
flink-filesystems/flink-s3-fs-native/README.md:
##########
@@ -327,6 +327,53 @@ When enabled, file uploads automatically use 
TransferManager for:
 - Better utilization of available bandwidth
 - Lower heap requirements for write operations
 
+## AWS Common Runtime (CRT) Support
+
+The filesystem optionally supports the [AWS Common Runtime 
(CRT)](https://github.com/awslabs/aws-crt-java) HTTP transport
+for higher throughput on large S3 workloads.
+
+When enabled, the CRT transport replaces:
+- **Sync client**: Apache HTTP Client → `AwsCrtHttpClient`
+- **Async client**: Netty NIO → `S3AsyncClient.crtBuilder()` (with built-in 
multipart acceleration)
+
+### Prerequisites
+
+The `aws-crt` artifact contains JNI-linked native libraries whose C-side 
`FindClass` paths are
+hardcoded, making Maven shade relocation incompatible. Therefore **CRT JARs 
are not bundled** in
+the fat JAR and must be placed manually.
+
+### Setup (step-by-step)
+
+1. Download `aws-crt-client-<version>.jar` (`groupId: software.amazon.awssdk`) 
and
+   `aws-crt-<version>.jar` (`groupId: software.amazon.awssdk.crt`) for the 
same AWS SDK version
+   used by this module (check `fs.s3.aws.sdk.version` in `pom.xml`).
+
+2. Place both JARs in the Flink plugin directory alongside 
`flink-s3-fs-native.jar`:
+
+   ```bash
+   cp aws-crt-client-<version>.jar $FLINK_HOME/plugins/s3-fs-native/
+   cp aws-crt-<version>.jar        $FLINK_HOME/plugins/s3-fs-native/
+   ```
+
+3. Enable CRT in your Flink configuration (`conf/config.yaml`):
+
+   ```yaml
+   s3.crt.enabled: true
+   ```
+
+4. Optionally tune the soft throughput target (default: 10.0 Gbps):

Review Comment:
   I would remove optional things since we plan to add several new flags.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -326,6 +345,11 @@ public static class Builder {
         // Custom credentials provider class names (comma-separated)
         @Nullable private String credentialsProviderClasses;
 
+        // CRT configuration
+        private boolean useCrt = false;
+        private double crtTargetThroughputGbps = 10.0;
+        private long crtMinPartSizeInBytes = 5L << 20; // 5MB

Review Comment:
   The hardcoded `5L << 20` duplicates knowledge already encoded in 
`PART_UPLOAD_MIN_SIZE.defaultValue()`. If the builder is ever used outside the 
factory this default will silently diverge from the config. Suggest deriving it 
from the config option instead:
   ```java
   private long crtMinPartSizeInBytes = 
NativeS3FileSystemFactory.PART_UPLOAD_MIN_SIZE.defaultValue();
   ```
   That way there is a single source of truth and the comment explaining the 
magic number is no longer needed.



##########
flink-filesystems/flink-s3-fs-native/README.md:
##########
@@ -327,6 +327,53 @@ When enabled, file uploads automatically use 
TransferManager for:
 - Better utilization of available bandwidth
 - Lower heap requirements for write operations
 
+## AWS Common Runtime (CRT) Support
+
+The filesystem optionally supports the [AWS Common Runtime 
(CRT)](https://github.com/awslabs/aws-crt-java) HTTP transport
+for higher throughput on large S3 workloads.
+
+When enabled, the CRT transport replaces:
+- **Sync client**: Apache HTTP Client → `AwsCrtHttpClient`
+- **Async client**: Netty NIO → `S3AsyncClient.crtBuilder()` (with built-in 
multipart acceleration)
+
+### Prerequisites
+
+The `aws-crt` artifact contains JNI-linked native libraries whose C-side 
`FindClass` paths are
+hardcoded, making Maven shade relocation incompatible. Therefore **CRT JARs 
are not bundled** in
+the fat JAR and must be placed manually.
+
+### Setup (step-by-step)
+
+1. Download `aws-crt-client-<version>.jar` (`groupId: software.amazon.awssdk`) 
and
+   `aws-crt-<version>.jar` (`groupId: software.amazon.awssdk.crt`) for the 
same AWS SDK version
+   used by this module (check `fs.s3.aws.sdk.version` in `pom.xml`).

Review Comment:
   The version guidance here is misleading: `software.amazon.awssdk.crt` 
(`aws-crt`) uses a completely independent versioning scheme (e.g. `0.33.x`) 
that does not align with the AWS SDK version (`2.x.y`). Telling users to 
download "the same AWS SDK version" will lead them to the wrong artifact.
   
   Two suggestions:
   1. Pin the tested `aws-crt` version explicitly, or explain that the correct 
version can be resolved from `aws-crt-client-<version>.pom` → its 
`<dependency>` on `software.amazon.awssdk.crt:aws-crt`.
   2. Consider shipping a small helper script (e.g. 
`tools/download-crt-jars.sh`) using `mvn dependency:get` or `curl` against 
Maven Central to fetch the exact compatible JARs and place them in the right 
directory — this would significantly reduce the friction of the manual setup 
step.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -476,30 +515,60 @@ S3ClientProvider build() {
                             .credentialsProvider(credentialsProvider)
                             .region(awsRegion)
                             .serviceConfiguration(s3Config)
-                            .httpClientBuilder(httpClientBuilder)
                             .overrideConfiguration(overrideConfig);
+            if (useCrt) {
+                clientBuilder.httpClientBuilder(
+                        AwsCrtHttpClient.builder()
+                                .maxConcurrency(maxConnections)
+                                .connectionTimeout(connectionTimeout)
+                                .connectionMaxIdleTime(connectionMaxIdleTime));

Review Comment:
   The CRT sync client omits `socketTimeout`. The Apache path sets 
`.socketTimeout(socketTimeout)` to bound how long a stalled read can block; 
`AwsCrtHttpClient.Builder` exposes `.readTimeout(Duration)` for exactly this 
purpose. Without it a slow connection can hang indefinitely.
   
   ```java
   AwsCrtHttpClient.builder()
       .maxConcurrency(maxConnections)
       .connectionTimeout(connectionTimeout)
       .connectionMaxIdleTime(connectionMaxIdleTime)
       .readTimeout(socketTimeout)
   ```



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -460,6 +482,9 @@ public FileSystem create(URI fsUri) throws IOException {
                         .maxRetries(config.get(MAX_RETRIES))
                         .credentialsProviderClasses(credentialsProviderClasses)
                         .encryptionConfig(encryptionConfig)
+                        .useCrt(config.get(CRT_ENABLED))
+                        
.crtTargetThroughputGbps(config.get(CRT_TARGET_THROUGHPUT_GBPS))
+                        
.crtMinPartSizeInBytes(config.get(PART_UPLOAD_MIN_SIZE))

Review Comment:
   There is no validation that `crtTargetThroughputGbps > 0`. Other 
connection-related values (`maxConnections`, `s3minPartSize`, etc.) are guarded 
with `Preconditions.checkArgument` a few lines above. Passing `0` or a negative 
value to the CRT builder will produce an opaque SDK exception. Suggest adding:
   ```java
   Preconditions.checkArgument(
       config.get(CRT_TARGET_THROUGHPUT_GBPS) > 0,
       "'%s' must be positive, but was %s",
       CRT_TARGET_THROUGHPUT_GBPS.key(),
       config.get(CRT_TARGET_THROUGHPUT_GBPS));
   ```



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java:
##########
@@ -203,6 +203,27 @@ void testEmptyProviderStringThrows() {
                 .hasMessageContaining("no valid provider class names");
     }
 
+    @Test
+    void testCrtDisabledByDefault() {
+        S3ClientProvider provider =
+                
S3ClientProvider.builder().endpoint(DUMMY_ENDPOINT).region(DUMMY_REGION).build();
+        assertThat(provider.isUseCrt()).isFalse();
+    }
+
+    @Test
+    void testCrtFlagIsRecorded() {
+        S3ClientProvider provider =
+                S3ClientProvider.builder()
+                        .endpoint(DUMMY_ENDPOINT)
+                        .region(DUMMY_REGION)
+                        .useCrt(true)
+                        .crtTargetThroughputGbps(20.0)
+                        .build();
+
+        assertThat(provider.isUseCrt()).isTrue();
+        assertThat(provider.getCrtTargetThroughputGbps()).isEqualTo(20.0);

Review Comment:
   This test only asserts that the builder stored the flag values — it does not 
verify that the CRT branch was actually taken during client construction. The 
`if (useCrt)` block could be silently deleted and this test would still pass. 
Consider adding a type-level assertion, e.g. via reflection on the `s3Client` 
or `asyncClient` field, to confirm that the constructed clients are actually 
CRT-backed when `useCrt=true`.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -476,30 +515,60 @@ S3ClientProvider build() {
                             .credentialsProvider(credentialsProvider)
                             .region(awsRegion)
                             .serviceConfiguration(s3Config)
-                            .httpClientBuilder(httpClientBuilder)
                             .overrideConfiguration(overrideConfig);
+            if (useCrt) {
+                clientBuilder.httpClientBuilder(
+                        AwsCrtHttpClient.builder()
+                                .maxConcurrency(maxConnections)
+                                .connectionTimeout(connectionTimeout)
+                                .connectionMaxIdleTime(connectionMaxIdleTime));
+            } else {
+                clientBuilder.httpClientBuilder(httpClientBuilder);
+            }
             if (endpointUri != null) {
                 clientBuilder.endpointOverride(endpointUri);
             }
             S3Client s3Client = clientBuilder.build();
 
-            S3AsyncClientBuilder asyncClientBuilder =
-                    S3AsyncClient.builder()
-                            .credentialsProvider(credentialsProvider)
-                            .region(awsRegion)
-                            .serviceConfiguration(s3Config)
-                            .httpClientBuilder(
-                                    NettyNioAsyncHttpClient.builder()
-                                            .maxConcurrency(maxConnections)
-                                            
.connectionTimeout(connectionTimeout)
-                                            .readTimeout(socketTimeout)
-                                            
.connectionAcquisitionTimeout(connectionTimeout))
-                            .overrideConfiguration(overrideConfig);
-            if (endpointUri != null) {
-                asyncClientBuilder.endpointOverride(endpointUri);
+            S3AsyncClient asyncClient;
+            if (useCrt) {
+                S3CrtAsyncClientBuilder crtAsyncBuilder =
+                        S3AsyncClient.crtBuilder()
+                                .credentialsProvider(credentialsProvider)
+                                .region(awsRegion)
+                                .forcePathStyle(pathStyleAccess)
+                                .checksumValidationEnabled(checksumValidation)
+                                .retryConfiguration(
+                                        S3CrtRetryConfiguration.builder()
+                                                .numRetries(maxRetries)
+                                                .build())
+                                .maxConcurrency(maxConnections)
+                                .targetThroughputInGbps((double) 
crtTargetThroughputGbps)
+                                .minimumPartSizeInBytes(crtMinPartSizeInBytes);
+                if (endpointUri != null) {
+                    crtAsyncBuilder.endpointOverride(endpointUri);
+                }
+                asyncClient = crtAsyncBuilder.build();

Review Comment:
   The non-CRT async path applies `s3Config` (which carries 
`chunkedEncodingEnabled(chunkedEncoding)`), but `S3CrtAsyncClientBuilder` has 
no equivalent setter so the `chunkedEncoding` setting is silently dropped here. 
If CRT always manages wire encoding internally and the option is intentionally 
inapplicable, it is worth a short comment (or a note in the 
`s3.chunked-encoding.enabled` config-option description) so operators are not 
surprised when that setting has no effect in CRT mode.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -476,30 +515,60 @@ S3ClientProvider build() {
                             .credentialsProvider(credentialsProvider)
                             .region(awsRegion)
                             .serviceConfiguration(s3Config)
-                            .httpClientBuilder(httpClientBuilder)
                             .overrideConfiguration(overrideConfig);
+            if (useCrt) {
+                clientBuilder.httpClientBuilder(
+                        AwsCrtHttpClient.builder()
+                                .maxConcurrency(maxConnections)
+                                .connectionTimeout(connectionTimeout)
+                                .connectionMaxIdleTime(connectionMaxIdleTime));
+            } else {
+                clientBuilder.httpClientBuilder(httpClientBuilder);
+            }

Review Comment:
   Two related issues with the `build()` method:
   
   1. `ApacheHttpClient.Builder httpClientBuilder` (a few lines above) is 
always constructed even in the CRT path where it is never used. It should move 
inside the `else` branch.
   
   2. `build()` is now ~120 lines with credential setup, two interleaved 
CRT/non-CRT transport branches, and object wiring all mixed together. Consider 
extracting two private helpers:
   ```java
   private S3Client buildSyncClient(...) { /* CRT vs Apache */ }
   private S3AsyncClient buildAsyncClient(...) { /* CRT vs Netty */ }
   ```
   This brings `build()` down to ~50 lines of orchestration and makes each 
transport path independently readable and testable.



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3ClientProviderTest.java:
##########
@@ -203,6 +203,27 @@ void testEmptyProviderStringThrows() {
                 .hasMessageContaining("no valid provider class names");
     }
 
+    @Test
+    void testCrtDisabledByDefault() {
+        S3ClientProvider provider =
+                
S3ClientProvider.builder().endpoint(DUMMY_ENDPOINT).region(DUMMY_REGION).build();
+        assertThat(provider.isUseCrt()).isFalse();
+    }
+
+    @Test
+    void testCrtFlagIsRecorded() {
+        S3ClientProvider provider =
+                S3ClientProvider.builder()
+                        .endpoint(DUMMY_ENDPOINT)
+                        .region(DUMMY_REGION)
+                        .useCrt(true)
+                        .crtTargetThroughputGbps(20.0)
+                        .build();
+
+        assertThat(provider.isUseCrt()).isTrue();
+        assertThat(provider.getCrtTargetThroughputGbps()).isEqualTo(20.0);
+    }
+
     @SuppressWarnings("unchecked")
     private static List<AwsCredentialsProvider> 
extractChain(AwsCredentialsProvider provider)

Review Comment:
   There is no test covering the case where `s3.crt.enabled=true` is configured 
but the CRT JARs are absent from the classpath. Given the manual installation 
requirement this is one of the most likely operator mistakes, and without a 
test the friendly error path (if added) has no coverage. Consider a test that 
removes/mocks the CRT classes from the classloader and asserts an 
`IllegalStateException` with an actionable message is thrown.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -476,30 +515,60 @@ S3ClientProvider build() {
                             .credentialsProvider(credentialsProvider)
                             .region(awsRegion)
                             .serviceConfiguration(s3Config)
-                            .httpClientBuilder(httpClientBuilder)
                             .overrideConfiguration(overrideConfig);
+            if (useCrt) {
+                clientBuilder.httpClientBuilder(
+                        AwsCrtHttpClient.builder()
+                                .maxConcurrency(maxConnections)
+                                .connectionTimeout(connectionTimeout)
+                                .connectionMaxIdleTime(connectionMaxIdleTime));
+            } else {
+                clientBuilder.httpClientBuilder(httpClientBuilder);
+            }

Review Comment:
   If `s3.crt.enabled: true` is set but the CRT JARs are missing from the 
plugin directory, the failure will be a raw `NoClassDefFoundError` for 
`AwsCrtHttpClient` or `S3CrtAsyncClientBuilder` — very hard to diagnose given 
the manual installation requirement.
   
   Consider guarding the CRT branches with a try/catch and re-throwing a clear 
error:
   ```java
   try {
       clientBuilder.httpClientBuilder(AwsCrtHttpClient.builder()...);
   } catch (NoClassDefFoundError e) {
       throw new IllegalStateException(
           "CRT transport requested (s3.crt.enabled=true) but aws-crt-client 
and aws-crt JARs "
           + "are not on the classpath. Place them in 
$FLINK_HOME/plugins/s3-fs-native/.", e);
   }
   ```



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -476,30 +515,60 @@ S3ClientProvider build() {
                             .credentialsProvider(credentialsProvider)
                             .region(awsRegion)
                             .serviceConfiguration(s3Config)
-                            .httpClientBuilder(httpClientBuilder)
                             .overrideConfiguration(overrideConfig);
+            if (useCrt) {
+                clientBuilder.httpClientBuilder(
+                        AwsCrtHttpClient.builder()
+                                .maxConcurrency(maxConnections)
+                                .connectionTimeout(connectionTimeout)
+                                .connectionMaxIdleTime(connectionMaxIdleTime));
+            } else {
+                clientBuilder.httpClientBuilder(httpClientBuilder);
+            }
             if (endpointUri != null) {
                 clientBuilder.endpointOverride(endpointUri);
             }
             S3Client s3Client = clientBuilder.build();
 
-            S3AsyncClientBuilder asyncClientBuilder =
-                    S3AsyncClient.builder()
-                            .credentialsProvider(credentialsProvider)
-                            .region(awsRegion)
-                            .serviceConfiguration(s3Config)
-                            .httpClientBuilder(
-                                    NettyNioAsyncHttpClient.builder()
-                                            .maxConcurrency(maxConnections)
-                                            
.connectionTimeout(connectionTimeout)
-                                            .readTimeout(socketTimeout)
-                                            
.connectionAcquisitionTimeout(connectionTimeout))
-                            .overrideConfiguration(overrideConfig);
-            if (endpointUri != null) {
-                asyncClientBuilder.endpointOverride(endpointUri);
+            S3AsyncClient asyncClient;
+            if (useCrt) {
+                S3CrtAsyncClientBuilder crtAsyncBuilder =
+                        S3AsyncClient.crtBuilder()
+                                .credentialsProvider(credentialsProvider)
+                                .region(awsRegion)
+                                .forcePathStyle(pathStyleAccess)
+                                .checksumValidationEnabled(checksumValidation)
+                                .retryConfiguration(
+                                        S3CrtRetryConfiguration.builder()
+                                                .numRetries(maxRetries)
+                                                .build())
+                                .maxConcurrency(maxConnections)
+                                .targetThroughputInGbps((double) 
crtTargetThroughputGbps)

Review Comment:
   Redundant cast: `crtTargetThroughputGbps` is already declared as `double`, 
so `(double)` is a no-op. Just pass the field directly:
   ```java
   .targetThroughputInGbps(crtTargetThroughputGbps)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to