Samrat002 commented on code in PR #27788:
URL: https://github.com/apache/flink/pull/27788#discussion_r3214472310


##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -359,35 +413,51 @@ public FileSystem create(URI fsUri) throws IOException {
                         .socketTimeout(config.get(SOCKET_TIMEOUT))
                         
.connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME))
                         .clientCloseTimeout(config.get(CLIENT_CLOSE_TIMEOUT))
-                        .assumeRoleArn(config.get(ASSUME_ROLE_ARN))
-                        
.assumeRoleExternalId(config.get(ASSUME_ROLE_EXTERNAL_ID))
-                        
.assumeRoleSessionName(config.get(ASSUME_ROLE_SESSION_NAME))
-                        .assumeRoleSessionDurationSeconds(
-                                
config.get(ASSUME_ROLE_SESSION_DURATION_SECONDS))
+                        .assumeRoleArn(assumeRoleArn)
+                        .assumeRoleExternalId(assumeRoleExternalId)
+                        .assumeRoleSessionName(assumeRoleSessionName)
+                        
.assumeRoleSessionDurationSeconds(assumeRoleSessionDuration)
                         .maxRetries(config.get(MAX_RETRIES))
-                        
.credentialsProviderClasses(config.get(AWS_CREDENTIALS_PROVIDER))
+                        .credentialsProviderClasses(credentialsProviderClasses)
                         .encryptionConfig(encryptionConfig)
                         .build();
 
-        NativeS3BulkCopyHelper bulkCopyHelper = null;

Review Comment:
   The `try-catch` block and the `bulkCopyMaxConcurrent` restructuring were not 
a bugfix — they were unnecessary complexity introduced during the initial 
draft. Both have been reverted. The current code uses the same 
`Preconditions.checkArgument` pattern as master, with `bulkCopyMaxConcurrent` 
validated inside the `if (config.get(BULK_COPY_ENABLED))` block exactly as it 
was before.



##########
flink-filesystems/flink-s3-fs-native/README.md:
##########
@@ -89,6 +89,29 @@ input.sinkTo(FileSink.forRowFormat(new 
Path("s3://my-bucket/output"),
 | s3.assume-role.session-name | flink-s3-session | Session name for the 
assumed role |
 | s3.assume-role.session-duration | 3600 | Session duration in seconds 
(900-43200) |
 
+## Bucket-Level Configuration
+
+The Native S3 FileSystem supports per-bucket configuration overrides, allowing 
different S3 buckets to use different connection settings within the same Flink 
cluster. This enables scenarios like:
+
+- **Checkpointing to one bucket** with specific credentials
+- **Savepoints to another bucket** with different region/endpoint

Review Comment:
   Absolutely right — savepoints and checkpoints can be (and often are) stored 
in the same bucket under different prefixes. The README now uses generic 
example scenarios (different credentials per bucket, different 
regions/endpoints, bucket-specific encryption) rather than implying any 
particular bucket topology is required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to