Izeren commented on code in PR #27788:
URL: https://github.com/apache/flink/pull/27788#discussion_r3021195610


##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+
+    /**
+     * Known bucket-level properties, sorted by descending length so that the 
longest match wins.
+     */
+    private static final String[] KNOWN_PROPERTIES =
+            new String[] {
+                "assume-role.session-duration",
+                "assume-role.session-name",
+                "assume-role.external-id",
+                "credentials.provider",
+                "path-style-access",
+                "sse.kms-key-id",
+                "assume-role.arn",
+                "access-key",
+                "secret-key",
+                "sse.type",
+                "endpoint",
+                "region"
+            };
+
+    private final Map<String, S3BucketConfig> bucketConfigs;
+
+    BucketConfigProvider(Configuration flinkConfig) {
+        this.bucketConfigs = 
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+    }
+
+    @Nullable
+    S3BucketConfig getBucketConfig(String bucketName) {
+        return bucketConfigs.get(bucketName);
+    }
+
+    @VisibleForTesting
+    boolean hasBucketConfig(String bucketName) {
+        return bucketConfigs.containsKey(bucketName);
+    }
+
+    @VisibleForTesting
+    int size() {
+        return bucketConfigs.size();
+    }
+
+    private static Map<String, S3BucketConfig> 
parseBucketConfigs(Configuration flinkConfig) {
+        Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+        for (String key : flinkConfig.keySet()) {

Review Comment:
   !nit, it is better to put final everywhere when variable doesn't change 
(especially for variables and arguments in methods that are longer than 5-10 
lines). When something is not final, I assume that it is reassigned somewhere. 
This applies not only to this line but throughout



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+
+    /**
+     * Known bucket-level properties, sorted by descending length so that the 
longest match wins.

Review Comment:
   Is it a performance optimization? Why not sort them during parsing to be 
explicit in LPM? 
   
   Currently, I don't see a test that would prevent someone from adding a 
property with order violation. 
   
   For readability, I would prefer lexicographical sort (easier for developer 
to eye ball right property) 
   



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+
+    /**
+     * Known bucket-level properties, sorted by descending length so that the 
longest match wins.
+     */
+    private static final String[] KNOWN_PROPERTIES =
+            new String[] {
+                "assume-role.session-duration",
+                "assume-role.session-name",
+                "assume-role.external-id",
+                "credentials.provider",
+                "path-style-access",
+                "sse.kms-key-id",
+                "assume-role.arn",
+                "access-key",
+                "secret-key",
+                "sse.type",
+                "endpoint",
+                "region"
+            };
+
+    private final Map<String, S3BucketConfig> bucketConfigs;
+
+    BucketConfigProvider(Configuration flinkConfig) {
+        this.bucketConfigs = 
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+    }
+
+    @Nullable
+    S3BucketConfig getBucketConfig(String bucketName) {
+        return bucketConfigs.get(bucketName);
+    }
+
+    @VisibleForTesting
+    boolean hasBucketConfig(String bucketName) {
+        return bucketConfigs.containsKey(bucketName);
+    }
+
+    @VisibleForTesting
+    int size() {
+        return bucketConfigs.size();
+    }
+
+    private static Map<String, S3BucketConfig> 
parseBucketConfigs(Configuration flinkConfig) {
+        Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+        for (String key : flinkConfig.keySet()) {
+            if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
+                continue;
+            }
+            String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
+            String value = flinkConfig.getString(key, null);
+            if (value == null) {
+                continue;
+            }
+
+            for (String prop : KNOWN_PROPERTIES) {
+                if (suffix.endsWith("." + prop)) {
+                    String bucketName = suffix.substring(0, suffix.length() - 
prop.length() - 1);
+                    if (!bucketName.isEmpty()) {
+                        rawConfigs
+                                .computeIfAbsent(bucketName, k -> new 
HashMap<>())
+                                .put(prop, value);
+                    }
+                    break;
+                }
+            }
+        }
+
+        Map<String, S3BucketConfig> result = new HashMap<>();
+        for (Map.Entry<String, Map<String, String>> entry : 
rawConfigs.entrySet()) {
+            String bucketName = entry.getKey();
+            Map<String, String> props = entry.getValue();
+
+            S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props);
+            if (bucketConfig.hasAnyOverride()) {
+                result.put(bucketName, bucketConfig);
+                LOG.info(
+                        "Registered bucket-specific configuration for bucket 
'{}': {}",
+                        bucketName,
+                        bucketConfig);
+            }
+        }
+
+        return result;
+    }
+
+    private static S3BucketConfig buildBucketConfig(String bucketName, 
Map<String, String> props) {
+        S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName);
+
+        applyIfPresent(props, "region", builder::region);

Review Comment:
   This is very error prone, it should rather be done in a loop over 
KNOWN_PROPERTIES. Otherwise you can add another known property but forget to 
apply it here. For properties that need exceptional handling, you could add 
branching in the loop itself. But if default is to "applyIfPresent" it should 
be respected. 
   
   I suggest you add a map from property name to the builder method and assert 
in tests that it's size matches number of known properties



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link BucketConfigProvider}. */
+class BucketConfigProviderTest {
+
+    @Test
+    void testParsesSingleBucketConfig() {
+        Configuration config = new Configuration();
+        config.setString("s3.bucket.my-bucket.endpoint", 
"https://s3.us-west-2.amazonaws.com";);
+        config.setString("s3.bucket.my-bucket.path-style-access", "true");
+        config.setString("s3.bucket.my-bucket.region", "us-west-2");

Review Comment:
   We should test for other partitions too. At least for `aws-cn`



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+
+    /**
+     * Known bucket-level properties, sorted by descending length so that the 
longest match wins.
+     */
+    private static final String[] KNOWN_PROPERTIES =
+            new String[] {
+                "assume-role.session-duration",
+                "assume-role.session-name",
+                "assume-role.external-id",
+                "credentials.provider",
+                "path-style-access",
+                "sse.kms-key-id",
+                "assume-role.arn",
+                "access-key",
+                "secret-key",
+                "sse.type",
+                "endpoint",
+                "region"
+            };
+
+    private final Map<String, S3BucketConfig> bucketConfigs;
+
+    BucketConfigProvider(Configuration flinkConfig) {
+        this.bucketConfigs = 
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+    }
+
+    @Nullable
+    S3BucketConfig getBucketConfig(String bucketName) {
+        return bucketConfigs.get(bucketName);
+    }
+
+    @VisibleForTesting
+    boolean hasBucketConfig(String bucketName) {
+        return bucketConfigs.containsKey(bucketName);
+    }
+
+    @VisibleForTesting
+    int size() {
+        return bucketConfigs.size();
+    }
+
+    private static Map<String, S3BucketConfig> 
parseBucketConfigs(Configuration flinkConfig) {
+        Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+        for (String key : flinkConfig.keySet()) {
+            if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {

Review Comment:
   Just wondering if someone adds another config with `s3.bucket` for anything 
else than this bucket config.  Will it be interpreted as a bucket name? (and 
probably skipped assuming that none of other properties match)



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+
+    /**
+     * Known bucket-level properties, sorted by descending length so that the 
longest match wins.
+     */
+    private static final String[] KNOWN_PROPERTIES =
+            new String[] {
+                "assume-role.session-duration",
+                "assume-role.session-name",
+                "assume-role.external-id",
+                "credentials.provider",
+                "path-style-access",
+                "sse.kms-key-id",
+                "assume-role.arn",
+                "access-key",
+                "secret-key",
+                "sse.type",
+                "endpoint",
+                "region"
+            };
+
+    private final Map<String, S3BucketConfig> bucketConfigs;
+
+    BucketConfigProvider(Configuration flinkConfig) {
+        this.bucketConfigs = 
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+    }
+
+    @Nullable
+    S3BucketConfig getBucketConfig(String bucketName) {
+        return bucketConfigs.get(bucketName);
+    }
+
+    @VisibleForTesting
+    boolean hasBucketConfig(String bucketName) {
+        return bucketConfigs.containsKey(bucketName);
+    }
+
+    @VisibleForTesting
+    int size() {
+        return bucketConfigs.size();
+    }
+
+    private static Map<String, S3BucketConfig> 
parseBucketConfigs(Configuration flinkConfig) {
+        Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+        for (String key : flinkConfig.keySet()) {
+            if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
+                continue;
+            }
+            String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
+            String value = flinkConfig.getString(key, null);
+            if (value == null) {
+                continue;
+            }
+
+            for (String prop : KNOWN_PROPERTIES) {
+                if (suffix.endsWith("." + prop)) {
+                    String bucketName = suffix.substring(0, suffix.length() - 
prop.length() - 1);
+                    if (!bucketName.isEmpty()) {
+                        rawConfigs
+                                .computeIfAbsent(bucketName, k -> new 
HashMap<>())
+                                .put(prop, value);
+                    }
+                    break;
+                }
+            }
+        }
+
+        Map<String, S3BucketConfig> result = new HashMap<>();
+        for (Map.Entry<String, Map<String, String>> entry : 
rawConfigs.entrySet()) {
+            String bucketName = entry.getKey();
+            Map<String, String> props = entry.getValue();
+
+            S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props);
+            if (bucketConfig.hasAnyOverride()) {
+                result.put(bucketName, bucketConfig);
+                LOG.info(
+                        "Registered bucket-specific configuration for bucket 
'{}': {}",
+                        bucketName,
+                        bucketConfig);
+            }
+        }
+
+        return result;
+    }
+
+    private static S3BucketConfig buildBucketConfig(String bucketName, 
Map<String, String> props) {
+        S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName);
+
+        applyIfPresent(props, "region", builder::region);
+        applyIfPresent(props, "endpoint", builder::endpoint);
+        applyIfPresent(props, "access-key", builder::accessKey);
+        applyIfPresent(props, "secret-key", builder::secretKey);
+        applyIfPresent(props, "sse.type", builder::sseType);
+        applyIfPresent(props, "sse.kms-key-id", builder::sseKmsKeyId);
+        applyIfPresent(props, "assume-role.arn", builder::assumeRoleArn);
+        applyIfPresent(props, "assume-role.external-id", 
builder::assumeRoleExternalId);
+        applyIfPresent(props, "assume-role.session-name", 
builder::assumeRoleSessionName);
+        applyIfPresent(props, "credentials.provider", 
builder::credentialsProvider);
+
+        String pathStyleStr = props.get("path-style-access");
+        if (pathStyleStr != null) {
+            builder.pathStyleAccess(Boolean.parseBoolean(pathStyleStr));
+        }
+
+        String durationStr = props.get("assume-role.session-duration");
+        if (durationStr != null) {
+            try {
+                
builder.assumeRoleSessionDurationSeconds(Integer.parseInt(durationStr));

Review Comment:
   Can we configure this property as Duration or are we limited by prefix 
matching where it has to be provided as int converted to string?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+
+    /**
+     * Known bucket-level properties, sorted by descending length so that the 
longest match wins.
+     */
+    private static final String[] KNOWN_PROPERTIES =
+            new String[] {
+                "assume-role.session-duration",
+                "assume-role.session-name",
+                "assume-role.external-id",
+                "credentials.provider",
+                "path-style-access",
+                "sse.kms-key-id",
+                "assume-role.arn",
+                "access-key",
+                "secret-key",
+                "sse.type",
+                "endpoint",
+                "region"
+            };
+
+    private final Map<String, S3BucketConfig> bucketConfigs;
+
+    BucketConfigProvider(Configuration flinkConfig) {
+        this.bucketConfigs = 
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+    }
+
+    @Nullable
+    S3BucketConfig getBucketConfig(String bucketName) {
+        return bucketConfigs.get(bucketName);
+    }
+
+    @VisibleForTesting
+    boolean hasBucketConfig(String bucketName) {
+        return bucketConfigs.containsKey(bucketName);
+    }
+
+    @VisibleForTesting
+    int size() {
+        return bucketConfigs.size();
+    }
+
+    private static Map<String, S3BucketConfig> 
parseBucketConfigs(Configuration flinkConfig) {
+        Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+        for (String key : flinkConfig.keySet()) {
+            if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
+                continue;
+            }
+            String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
+            String value = flinkConfig.getString(key, null);
+            if (value == null) {
+                continue;
+            }
+
+            for (String prop : KNOWN_PROPERTIES) {
+                if (suffix.endsWith("." + prop)) {
+                    String bucketName = suffix.substring(0, suffix.length() - 
prop.length() - 1);
+                    if (!bucketName.isEmpty()) {

Review Comment:
   Should we check that bucket name consists of valid characters? Also does it 
mean that we disallow configuring properties for "all" buckets? Do we need 
explicitly configure each property for every bucket?
   
   Another question. Is it generally a legit situation to have empty bucket 
name? Should we log a warning to highlight the mistake if not?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -247,13 +267,54 @@ public FileSystem create(URI fsUri) throws IOException {
         String region = config.get(REGION);
         String endpoint = config.get(ENDPOINT);
         boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);
-

Review Comment:
   Why do we set access key/secret key at the bucket level, but don't set at 
the global level? Is it explicit opt-out?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+
+    /**
+     * Known bucket-level properties, sorted by descending length so that the 
longest match wins.
+     */
+    private static final String[] KNOWN_PROPERTIES =
+            new String[] {
+                "assume-role.session-duration",
+                "assume-role.session-name",
+                "assume-role.external-id",
+                "credentials.provider",
+                "path-style-access",
+                "sse.kms-key-id",
+                "assume-role.arn",
+                "access-key",
+                "secret-key",
+                "sse.type",
+                "endpoint",
+                "region"
+            };
+
+    private final Map<String, S3BucketConfig> bucketConfigs;
+
+    BucketConfigProvider(Configuration flinkConfig) {
+        this.bucketConfigs = 
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+    }
+
+    @Nullable
+    S3BucketConfig getBucketConfig(String bucketName) {
+        return bucketConfigs.get(bucketName);
+    }
+
+    @VisibleForTesting
+    boolean hasBucketConfig(String bucketName) {
+        return bucketConfigs.containsKey(bucketName);
+    }
+
+    @VisibleForTesting
+    int size() {
+        return bucketConfigs.size();
+    }
+
+    private static Map<String, S3BucketConfig> 
parseBucketConfigs(Configuration flinkConfig) {
+        Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+        for (String key : flinkConfig.keySet()) {
+            if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
+                continue;
+            }
+            String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
+            String value = flinkConfig.getString(key, null);
+            if (value == null) {
+                continue;
+            }
+
+            for (String prop : KNOWN_PROPERTIES) {
+                if (suffix.endsWith("." + prop)) {
+                    String bucketName = suffix.substring(0, suffix.length() - 
prop.length() - 1);
+                    if (!bucketName.isEmpty()) {
+                        rawConfigs
+                                .computeIfAbsent(bucketName, k -> new 
HashMap<>())
+                                .put(prop, value);
+                    }
+                    break;
+                }
+            }
+        }
+
+        Map<String, S3BucketConfig> result = new HashMap<>();
+        for (Map.Entry<String, Map<String, String>> entry : 
rawConfigs.entrySet()) {
+            String bucketName = entry.getKey();
+            Map<String, String> props = entry.getValue();
+
+            S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props);
+            if (bucketConfig.hasAnyOverride()) {
+                result.put(bucketName, bucketConfig);
+                LOG.info(
+                        "Registered bucket-specific configuration for bucket 
'{}': {}",
+                        bucketName,
+                        bucketConfig);
+            }
+        }
+
+        return result;
+    }
+
+    private static S3BucketConfig buildBucketConfig(String bucketName, 
Map<String, String> props) {
+        S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName);
+
+        applyIfPresent(props, "region", builder::region);
+        applyIfPresent(props, "endpoint", builder::endpoint);
+        applyIfPresent(props, "access-key", builder::accessKey);
+        applyIfPresent(props, "secret-key", builder::secretKey);
+        applyIfPresent(props, "sse.type", builder::sseType);
+        applyIfPresent(props, "sse.kms-key-id", builder::sseKmsKeyId);
+        applyIfPresent(props, "assume-role.arn", builder::assumeRoleArn);
+        applyIfPresent(props, "assume-role.external-id", 
builder::assumeRoleExternalId);
+        applyIfPresent(props, "assume-role.session-name", 
builder::assumeRoleSessionName);
+        applyIfPresent(props, "credentials.provider", 
builder::credentialsProvider);
+
+        String pathStyleStr = props.get("path-style-access");
+        if (pathStyleStr != null) {
+            builder.pathStyleAccess(Boolean.parseBoolean(pathStyleStr));
+        }
+
+        String durationStr = props.get("assume-role.session-duration");
+        if (durationStr != null) {
+            try {
+                
builder.assumeRoleSessionDurationSeconds(Integer.parseInt(durationStr));
+            } catch (NumberFormatException e) {
+                throw new IllegalConfigurationException(

Review Comment:
   Why do we extract property validation only for this property and not for the 
rest? We never validate that arn looks like arn or kms key id looks correct. 
Also we don't validate that both access-key and secret-key have been provided, 
what if only one of them was?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -218,13 +232,13 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                                     + "static credentials (if configured) -> 
DefaultCredentialsProvider.");
 
     private Configuration flinkConfig;

Review Comment:
   Why this is not volatile similarly to bucketConfigProvider?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -164,14 +180,12 @@ public class NativeS3FileSystemFactory implements 
FileSystemFactory {
                                     + "Example: 
'arn:aws:kms:us-east-1:123456789:key/12345678-1234-1234-1234-123456789abc' "
                                     + "or 'alias/my-s3-key'");
 
-    // IAM Assume Role Configuration
     public static final ConfigOption<String> ASSUME_ROLE_ARN =
             ConfigOptions.key("s3.assume-role.arn")
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
                             "ARN of the IAM role to assume for S3 access. "
-                                    + "Enables cross-account access or 
temporary elevated permissions. "
                                     + "Example: 
'arn:aws:iam::123456789012:role/S3AccessRole'");

Review Comment:
   I would also add explicit example for some other partition, e.g. aws-cn



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+
+    /**
+     * Known bucket-level properties, sorted by descending length so that the 
longest match wins.
+     */
+    private static final String[] KNOWN_PROPERTIES =
+            new String[] {
+                "assume-role.session-duration",
+                "assume-role.session-name",
+                "assume-role.external-id",
+                "credentials.provider",
+                "path-style-access",
+                "sse.kms-key-id",
+                "assume-role.arn",
+                "access-key",
+                "secret-key",
+                "sse.type",
+                "endpoint",
+                "region"
+            };
+
+    private final Map<String, S3BucketConfig> bucketConfigs;
+
+    BucketConfigProvider(Configuration flinkConfig) {
+        this.bucketConfigs = 
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+    }
+
+    @Nullable
+    S3BucketConfig getBucketConfig(String bucketName) {
+        return bucketConfigs.get(bucketName);
+    }
+
+    @VisibleForTesting
+    boolean hasBucketConfig(String bucketName) {
+        return bucketConfigs.containsKey(bucketName);
+    }
+
+    @VisibleForTesting
+    int size() {
+        return bucketConfigs.size();
+    }
+
+    private static Map<String, S3BucketConfig> 
parseBucketConfigs(Configuration flinkConfig) {
+        Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+        for (String key : flinkConfig.keySet()) {
+            if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
+                continue;
+            }
+            String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
+            String value = flinkConfig.getString(key, null);
+            if (value == null) {
+                continue;
+            }
+
+            for (String prop : KNOWN_PROPERTIES) {
+                if (suffix.endsWith("." + prop)) {
+                    String bucketName = suffix.substring(0, suffix.length() - 
prop.length() - 1);
+                    if (!bucketName.isEmpty()) {
+                        rawConfigs
+                                .computeIfAbsent(bucketName, k -> new 
HashMap<>())
+                                .put(prop, value);
+                    }
+                    break;
+                }
+            }
+        }
+
+        Map<String, S3BucketConfig> result = new HashMap<>();
+        for (Map.Entry<String, Map<String, String>> entry : 
rawConfigs.entrySet()) {
+            String bucketName = entry.getKey();
+            Map<String, String> props = entry.getValue();
+
+            S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props);
+            if (bucketConfig.hasAnyOverride()) {
+                result.put(bucketName, bucketConfig);
+                LOG.info(

Review Comment:
   Noting, that we don't log credentials here, because toString is overridden 
in bucketConfig.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java:
##########
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * Immutable, bucket-specific S3 configuration overrides.
+ *
+ * <p>Null values indicate inheritance from global configuration. Only 
explicitly configured values
+ * are non-null. Configuration format: {@code 
s3.bucket.<bucket-name>.<property>}
+ *
+ * <p>Validates that credentials (access-key/secret-key) are either both set 
or both absent.
+ */
+@Internal
+final class S3BucketConfig {
+
+    private final String bucketName;
+    @Nullable private final String region;
+    @Nullable private final String endpoint;
+    @Nullable private final Boolean pathStyleAccess;
+    @Nullable private final String accessKey;
+    @Nullable private final String secretKey;
+    @Nullable private final String sseType;
+    @Nullable private final String sseKmsKeyId;
+    @Nullable private final String assumeRoleArn;
+    @Nullable private final String assumeRoleExternalId;
+    @Nullable private final String assumeRoleSessionName;
+    @Nullable private final Integer assumeRoleSessionDurationSeconds;
+    @Nullable private final String credentialsProvider;
+
+    private S3BucketConfig(Builder builder) {
+        this.bucketName = builder.bucketName;
+        this.region = builder.region;
+        this.endpoint = builder.endpoint;
+        this.pathStyleAccess = builder.pathStyleAccess;
+        this.accessKey = builder.accessKey;
+        this.secretKey = builder.secretKey;
+        this.sseType = builder.sseType;
+        this.sseKmsKeyId = builder.sseKmsKeyId;
+        this.assumeRoleArn = builder.assumeRoleArn;
+        this.assumeRoleExternalId = builder.assumeRoleExternalId;
+        this.assumeRoleSessionName = builder.assumeRoleSessionName;
+        this.assumeRoleSessionDurationSeconds = 
builder.assumeRoleSessionDurationSeconds;
+        this.credentialsProvider = builder.credentialsProvider;
+    }
+
+    String getBucketName() {
+        return bucketName;
+    }
+
+    @Nullable
+    String getRegion() {
+        return region;
+    }
+
+    @Nullable
+    String getEndpoint() {
+        return endpoint;
+    }
+
+    @Nullable
+    Boolean getPathStyleAccess() {
+        return pathStyleAccess;
+    }
+
+    @Nullable
+    String getAccessKey() {
+        return accessKey;
+    }
+
+    @Nullable
+    String getSecretKey() {
+        return secretKey;
+    }
+
+    @Nullable
+    String getSseType() {
+        return sseType;
+    }
+
+    @Nullable
+    String getSseKmsKeyId() {
+        return sseKmsKeyId;
+    }
+
+    @Nullable
+    String getAssumeRoleArn() {
+        return assumeRoleArn;
+    }
+
+    @Nullable
+    String getAssumeRoleExternalId() {
+        return assumeRoleExternalId;
+    }
+
+    @Nullable
+    String getAssumeRoleSessionName() {
+        return assumeRoleSessionName;
+    }
+
+    @Nullable
+    Integer getAssumeRoleSessionDurationSeconds() {
+        return assumeRoleSessionDurationSeconds;
+    }
+
+    @Nullable
+    String getCredentialsProvider() {
+        return credentialsProvider;
+    }
+
+    boolean hasAnyOverride() {
+        return region != null
+                || endpoint != null
+                || pathStyleAccess != null
+                || accessKey != null
+                || secretKey != null
+                || sseType != null
+                || sseKmsKeyId != null
+                || assumeRoleArn != null
+                || assumeRoleExternalId != null
+                || assumeRoleSessionName != null
+                || assumeRoleSessionDurationSeconds != null
+                || credentialsProvider != null;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        S3BucketConfig that = (S3BucketConfig) o;
+        return Objects.equals(bucketName, that.bucketName)
+                && Objects.equals(region, that.region)
+                && Objects.equals(endpoint, that.endpoint)
+                && Objects.equals(pathStyleAccess, that.pathStyleAccess)
+                && Objects.equals(accessKey, that.accessKey)
+                && Objects.equals(secretKey, that.secretKey)
+                && Objects.equals(sseType, that.sseType)
+                && Objects.equals(sseKmsKeyId, that.sseKmsKeyId)
+                && Objects.equals(assumeRoleArn, that.assumeRoleArn)
+                && Objects.equals(assumeRoleExternalId, 
that.assumeRoleExternalId)
+                && Objects.equals(assumeRoleSessionName, 
that.assumeRoleSessionName)
+                && Objects.equals(
+                        assumeRoleSessionDurationSeconds, 
that.assumeRoleSessionDurationSeconds)
+                && Objects.equals(credentialsProvider, 
that.credentialsProvider);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                bucketName,
+                region,
+                endpoint,
+                pathStyleAccess,
+                accessKey,
+                secretKey,
+                sseType,
+                sseKmsKeyId,
+                assumeRoleArn,
+                assumeRoleExternalId,
+                assumeRoleSessionName,
+                assumeRoleSessionDurationSeconds,
+                credentialsProvider);
+    }
+
+    @Override
+    public String toString() {

Review Comment:
   !nit, quotes handling here is a bit messy. You can have unclosed quotes if 
not all configurations are provided



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java:
##########
@@ -233,6 +247,12 @@ public int getPriority() {
     @Override
     public void configure(Configuration config) {
         this.flinkConfig = config;
+        this.bucketConfigProvider = new BucketConfigProvider(config);
+        if (bucketConfigProvider.size() > 0) {
+            LOG.info(

Review Comment:
   Does it bring value on top of actually logging all configurations in the 
parser? 



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code 
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and 
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest 
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketConfigProvider.class);
+
+    static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+
+    /**
+     * Known bucket-level properties, sorted by descending length so that the 
longest match wins.
+     */
+    private static final String[] KNOWN_PROPERTIES =
+            new String[] {
+                "assume-role.session-duration",
+                "assume-role.session-name",
+                "assume-role.external-id",
+                "credentials.provider",
+                "path-style-access",
+                "sse.kms-key-id",
+                "assume-role.arn",
+                "access-key",
+                "secret-key",
+                "sse.type",
+                "endpoint",
+                "region"
+            };
+
+    private final Map<String, S3BucketConfig> bucketConfigs;
+
+    BucketConfigProvider(Configuration flinkConfig) {
+        this.bucketConfigs = 
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+    }
+
+    @Nullable
+    S3BucketConfig getBucketConfig(String bucketName) {
+        return bucketConfigs.get(bucketName);
+    }
+
+    @VisibleForTesting
+    boolean hasBucketConfig(String bucketName) {
+        return bucketConfigs.containsKey(bucketName);
+    }
+
+    @VisibleForTesting
+    int size() {
+        return bucketConfigs.size();
+    }
+
+    private static Map<String, S3BucketConfig> 
parseBucketConfigs(Configuration flinkConfig) {
+        Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+        for (String key : flinkConfig.keySet()) {
+            if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
+                continue;
+            }
+            String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
+            String value = flinkConfig.getString(key, null);
+            if (value == null) {
+                continue;
+            }
+
+            for (String prop : KNOWN_PROPERTIES) {
+                if (suffix.endsWith("." + prop)) {
+                    String bucketName = suffix.substring(0, suffix.length() - 
prop.length() - 1);
+                    if (!bucketName.isEmpty()) {
+                        rawConfigs
+                                .computeIfAbsent(bucketName, k -> new 
HashMap<>())
+                                .put(prop, value);
+                    }
+                    break;
+                }
+            }
+        }
+
+        Map<String, S3BucketConfig> result = new HashMap<>();
+        for (Map.Entry<String, Map<String, String>> entry : 
rawConfigs.entrySet()) {
+            String bucketName = entry.getKey();
+            Map<String, String> props = entry.getValue();
+
+            S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props);
+            if (bucketConfig.hasAnyOverride()) {
+                result.put(bucketName, bucketConfig);
+                LOG.info(
+                        "Registered bucket-specific configuration for bucket 
'{}': {}",
+                        bucketName,
+                        bucketConfig);
+            }
+        }
+
+        return result;
+    }
+
+    private static S3BucketConfig buildBucketConfig(String bucketName, 
Map<String, String> props) {
+        S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName);
+
+        applyIfPresent(props, "region", builder::region);
+        applyIfPresent(props, "endpoint", builder::endpoint);
+        applyIfPresent(props, "access-key", builder::accessKey);
+        applyIfPresent(props, "secret-key", builder::secretKey);

Review Comment:
   Hmm, could you please double-check that this is covered by 
GlobalConfiguration.isSensitive()? We must ensure that at least FlinkConfigMap 
response on JM is not going to show secrets in plaintext in flink UI



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to