gaborgsomogyi commented on code in PR #28136:
URL: https://github.com/apache/flink/pull/28136#discussion_r3287274216
##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -463,27 +534,143 @@ S3ClientProvider build() {
.retryPolicy(RetryPolicy.builder().numRetries(maxRetries).build())
.build();
- ApacheHttpClient.Builder httpClientBuilder =
- ApacheHttpClient.builder()
- .maxConnections(maxConnections)
- .connectionTimeout(connectionTimeout)
- .socketTimeout(socketTimeout)
- .tcpKeepAlive(true)
- .connectionMaxIdleTime(connectionMaxIdleTime);
+ if (useCrt) {
+ LOG.info(
+ "AWS CRT transport enabled (s3.crt.enabled=true) with
target throughput {}",
+ crtTargetThroughputGbps != null
+ ? crtTargetThroughputGbps + " Gbps"
+ : "(CRT runtime default)");
+ }
+
+ S3Client s3Client =
+ buildSyncClient(
+ credentialsProvider, awsRegion, s3Config,
overrideConfig, endpointUri);
+ S3AsyncClient asyncClient =
+ buildAsyncClient(
+ credentialsProvider, awsRegion, s3Config,
overrideConfig, endpointUri);
+ S3TransferManager transferManager =
+ S3TransferManager.builder().s3Client(asyncClient).build();
+
+ return new S3ClientProvider(
+ s3Client,
+ asyncClient,
+ transferManager,
+ encryptionConfig,
+ credentialsProvider,
+ stsClient,
+ clientCloseTimeout,
+ connectionTimeout,
+ socketTimeout,
+ connectionMaxIdleTime,
+ pathStyleAccess,
+ chunkedEncoding,
+ checksumValidation,
+ maxConnections,
+ maxRetries,
+ region,
+ endpoint,
+ assumeRoleArn,
+ assumeRoleExternalId,
+ assumeRoleSessionName,
+ assumeRoleSessionDurationSeconds,
+ useCrt,
+ crtTargetThroughputGbps);
+ }
+
+ /**
+ * Builds the synchronous {@link S3Client}, choosing between the
Apache HTTP transport and
+ * the AWS CRT transport based on {@link #useCrt}.
+ */
+ private S3Client buildSyncClient(
+ AwsCredentialsProvider credentialsProvider,
+ Region awsRegion,
+ S3Configuration s3Config,
+ ClientOverrideConfiguration overrideConfig,
+ @Nullable URI endpointUri) {
S3ClientBuilder clientBuilder =
S3Client.builder()
.credentialsProvider(credentialsProvider)
.region(awsRegion)
.serviceConfiguration(s3Config)
- .httpClientBuilder(httpClientBuilder)
.overrideConfiguration(overrideConfig);
+
+ if (useCrt) {
+ try {
+ // Note: AwsCrtHttpClient.Builder does not expose a
`readTimeout(Duration)`
+ // equivalent of the Apache client's socket timeout — the
CRT runtime relies
+ // on `ConnectionHealthConfiguration` for stalled-read
detection instead, so
+ // the `s3.socket.timeout` setting is silently ignored in
CRT mode.
+ clientBuilder.httpClientBuilder(
+ AwsCrtHttpClient.builder()
+ .maxConcurrency(maxConnections)
+ .connectionTimeout(connectionTimeout)
+
.connectionMaxIdleTime(connectionMaxIdleTime));
+ } catch (NoClassDefFoundError e) {
+ throw new IllegalStateException(crtMissingJarsMessage(),
e);
Review Comment:
`catch (NoClassDefFoundError)` is incomplete. When `aws-crt.jar` is entirely
absent, `NoClassDefFoundError` is thrown for the missing CRT bridge classes —
correct. But if the JAR is present and the native library fails to load (e.g.
`.so` extraction fails), `UnsatisfiedLinkError` is thrown instead, which is not
caught here and will surface as an unhandled error with no actionable message.
Catching the broader `LinkageError` would be too wide — it also covers
binary-incompatibility errors like `NoSuchMethodError`, `AbstractMethodError`,
`IncompatibleClassChangeError` etc., for which telling the user to "place the
JAR in the plugin directory" is actively misleading.
Catch exactly the two errors that mean *absent*:
```java
} catch (NoClassDefFoundError | UnsatisfiedLinkError e) {
throw new IllegalStateException(crtMissingJarsMessage(), e);
}
```
Same change needed in `buildAsyncClient`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]