FrankChen021 commented on code in PR #19317:
URL: https://github.com/apache/druid/pull/19317#discussion_r3161338202


##########
extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/ServerSideEncryptingAmazonS3.java:
##########
@@ -274,6 +301,161 @@ public void upload(String bucket, String key, File file, 
@Nullable Grant aclGran
     }
   }
 
+  public static ServerSideEncryptingAmazonS3.Builder builder(
+      AwsCredentialsProvider awsCredentialsProvider,
+      S3StorageConfig s3StorageConfig,
+      @Nullable AWSProxyConfig awsProxyConfig,
+      @Nullable AWSEndpointConfig awsEndpointConfig,
+      @Nullable AWSClientConfig awsClientConfig,
+      @Nullable S3InputSourceConfig s3InputSourceConfig,
+      @Nullable S3ExportStorageProvider s3ExportStorageProvider
+  )
+  {
+    if (s3InputSourceConfig != null && s3ExportStorageProvider != null) {
+      throw DruidException.defensive("Cannot set both s3InputSourceConfig and 
s3ExportStorageProvider!");
+    }
+    final String assumeRoleArn;
+    final String assumeRoleExternalId;
+    if (s3InputSourceConfig != null) {
+      assumeRoleArn = s3InputSourceConfig.getAssumeRoleArn();
+      assumeRoleExternalId = s3InputSourceConfig.getAssumeRoleExternalId();
+    } else if (s3ExportStorageProvider != null) {
+      assumeRoleArn = s3ExportStorageProvider.getAssumeRoleArn();
+      assumeRoleExternalId = s3ExportStorageProvider.getAssumeRoleExternalId();
+    } else {
+      assumeRoleArn = null;
+      assumeRoleExternalId = null;
+    }
+
+    // Build a custom S3Client with the provided configuration
+    S3ClientBuilder clientBuilder = S3Client.builder();
+    S3AsyncClientBuilder asyncClientBuilder = S3AsyncClient.builder();
+
+    // Configure endpoint and region
+    if (awsEndpointConfig != null) {
+      if (!Strings.isNullOrEmpty(awsEndpointConfig.getUrl())) {
+        String endpointUrl = awsEndpointConfig.getUrl();
+        // Ensure endpoint URL has a scheme
+        if (!endpointUrl.startsWith("http://";) && 
!endpointUrl.startsWith("https://";)) {
+          boolean useHttps = S3Utils.useHttps(awsClientConfig, 
awsEndpointConfig);
+          endpointUrl = S3Utils.ensureEndpointHasScheme(endpointUrl, useHttps);
+        }
+        URI endpointOverride = URI.create(endpointUrl);
+        clientBuilder.endpointOverride(endpointOverride);
+        asyncClientBuilder.endpointOverride(endpointOverride);
+      }
+      if (!Strings.isNullOrEmpty(awsEndpointConfig.getSigningRegion())) {
+        Region region = Region.of(awsEndpointConfig.getSigningRegion());
+        clientBuilder.region(region);
+        asyncClientBuilder.region(region);
+      }
+    }
+
+    ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder();
+
+    // Configure S3-specific settings
+    if (awsClientConfig != null) {
+      
httpClientBuilder.connectionTimeout(Duration.ofMillis(awsClientConfig.getConnectionTimeoutMillis()))
+                       
.socketTimeout(Duration.ofMillis(awsClientConfig.getSocketTimeoutMillis()))
+                       .maxConnections(awsClientConfig.getMaxConnections());
+      S3Configuration s3Config = S3Configuration.builder()
+                                                
.chunkedEncodingEnabled(!awsClientConfig.isDisableChunkedEncoding())
+                                                .build();
+      clientBuilder.serviceConfiguration(s3Config)
+                   .forcePathStyle(awsClientConfig.isEnablePathStyleAccess())
+                   
.crossRegionAccessEnabled(awsClientConfig.isCrossRegionAccessEnabled());
+      
asyncClientBuilder.forcePathStyle(awsClientConfig.isEnablePathStyleAccess())
+                        
.crossRegionAccessEnabled(awsClientConfig.isCrossRegionAccessEnabled())
+                        
.httpClientBuilder(AsyncHttpClientType.fromString(s3StorageConfig.getS3TransferConfig().getAsyncHttpClientType()).buildBuilder(awsClientConfig))
+                        .multipartEnabled(true);
+    }
+
+    // Configure HTTP client with proxy if needed
+    if (awsProxyConfig != null) {
+      ProxyConfiguration proxyConfig = 
S3Utils.buildProxyConfiguration(awsProxyConfig);
+      if (proxyConfig != null) {
+        httpClientBuilder.proxyConfiguration(proxyConfig);
+      }
+    }
+    clientBuilder.httpClientBuilder(httpClientBuilder);
+
+    // Configure credentials
+    AwsCredentialsProvider credentialsProvider;
+    if (s3InputSourceConfig != null && 
s3InputSourceConfig.isCredentialsConfigured()) {
+      credentialsProvider = 
createStaticCredentialsProvider(s3InputSourceConfig);
+    } else {
+      credentialsProvider = awsCredentialsProvider;
+    }
+
+    // Apply assume role if configured
+    if (!Strings.isNullOrEmpty(assumeRoleArn)) {
+      credentialsProvider = createAssumeRoleCredentialsProvider(
+          assumeRoleArn,
+          assumeRoleExternalId,
+          awsEndpointConfig,
+          credentialsProvider
+      );
+    }
+
+    clientBuilder.credentialsProvider(credentialsProvider);
+    asyncClientBuilder.credentialsProvider(credentialsProvider);
+
+    // Build and wrap in ServerSideEncryptingAmazonS3
+    return ServerSideEncryptingAmazonS3.builder()
+                                       
.setS3ClientSupplier(clientBuilder::build)
+                                       
.setS3AsyncClientSupplier(asyncClientBuilder::build)
+                                       .setS3StorageConfig(s3StorageConfig);
+  }
+
+  @Nonnull
+  private static StaticCredentialsProvider 
createStaticCredentialsProvider(S3InputSourceConfig s3InputSourceConfig)
+  {
+    if (s3InputSourceConfig.getSessionToken() != null) {
+      AwsSessionCredentials sessionCredentials = AwsSessionCredentials.create(
+          s3InputSourceConfig.getAccessKeyId().getPassword(),
+          s3InputSourceConfig.getSecretAccessKey().getPassword(),
+          s3InputSourceConfig.getSessionToken().getPassword()
+      );
+      return StaticCredentialsProvider.create(sessionCredentials);
+    } else {
+      return StaticCredentialsProvider.create(
+          AwsBasicCredentials.create(
+              s3InputSourceConfig.getAccessKeyId().getPassword(),
+              s3InputSourceConfig.getSecretAccessKey().getPassword()
+          )
+      );
+    }
+  }
+
+  public static AwsCredentialsProvider createAssumeRoleCredentialsProvider(
+      String assumeRoleArn,
+      @Nullable String assumeRoleExternalId,
+      @Nullable AWSEndpointConfig awsEndpointConfig,
+      AwsCredentialsProvider baseCredentialsProvider
+  )
+  {
+    String roleSessionName = StringUtils.format("druid-s3-%s", 
UUID.randomUUID().toString());
+
+    StsClientBuilder stsBuilder = 
StsClient.builder().credentialsProvider(baseCredentialsProvider);
+    // If we have endpoint config, use its region for STS too
+    if (awsEndpointConfig != null && awsEndpointConfig.getSigningRegion() != 
null) {
+      stsBuilder.region(Region.of(awsEndpointConfig.getSigningRegion()));
+    }
+
+    AssumeRoleRequest.Builder assumeRoleRequestBuilder =
+        
AssumeRoleRequest.builder().roleArn(assumeRoleArn).roleSessionName(roleSessionName).durationSeconds(3600);
+    if (assumeRoleExternalId != null) {
+      assumeRoleRequestBuilder.externalId(assumeRoleExternalId);
+    }
+
+    return StsAssumeRoleCredentialsProvider.builder()
+                                           .stsClient(stsBuilder.build())
+                                           
.refreshRequest(assumeRoleRequestBuilder.build())
+                                           .asyncCredentialUpdateEnabled(true)

Review Comment:
   [P2] Avoid leaking assume-role refresh resources
   
   When an MSQ export specifies assumeRoleArn, 
S3ExportStorageProvider.createStorageConnector builds a fresh 
ServerSideEncryptingAmazonS3 for each connector creation. That path creates an 
StsAssumeRoleCredentialsProvider with asyncCredentialUpdateEnabled(true) plus a 
new STS client, but neither the provider, STS client, nor S3 clients are 
lifecycle-managed or closed after the export connector is done. Since exports 
create connectors for the empty-location check, worker writes, and manifest 
writing, each ARN export can leave background refresh/client resources behind. 
Reuse a lifecycle-managed role-specific client/provider or make the connector 
own and close the resources it creates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to