Izeren commented on code in PR #27788: URL: https://github.com/apache/flink/pull/27788#discussion_r3021195610
########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}. + * + * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket + * names containing dots are supported; properties are matched by longest suffix first. + * + * <p>Immutable and thread-safe after construction. + */ +@Internal +final class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + /** + * Known bucket-level properties, sorted by descending length so that the longest match wins. + */ + private static final String[] KNOWN_PROPERTIES = + new String[] { + "assume-role.session-duration", + "assume-role.session-name", + "assume-role.external-id", + "credentials.provider", + "path-style-access", + "sse.kms-key-id", + "assume-role.arn", + "access-key", + "secret-key", + "sse.type", + "endpoint", + "region" + }; + + private final Map<String, S3BucketConfig> bucketConfigs; + + BucketConfigProvider(Configuration flinkConfig) { + this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig)); + } + + @Nullable + S3BucketConfig getBucketConfig(String bucketName) { + return bucketConfigs.get(bucketName); + } + + @VisibleForTesting + boolean hasBucketConfig(String bucketName) { + return bucketConfigs.containsKey(bucketName); + } + + @VisibleForTesting + int size() { + return bucketConfigs.size(); + } + + private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) { + Map<String, Map<String, String>> rawConfigs = new HashMap<>(); + + for (String key : flinkConfig.keySet()) { Review Comment: !nit, it is better to put final everywhere when variable doesn't change (especially for variables and arguments in methods that are longer than 5-10 lines). When something is not final, I assume that it is reassigned somewhere. This applies not only to this line but throughout ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}. + * + * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket + * names containing dots are supported; properties are matched by longest suffix first. + * + * <p>Immutable and thread-safe after construction. + */ +@Internal +final class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + /** + * Known bucket-level properties, sorted by descending length so that the longest match wins. Review Comment: Is it a performance optimization? Why not sort them during parsing to be explicit in LPM? Currently, I don't see a test that would prevent someone from adding a property with order violation. For readability, I would prefer lexicographical sort (easier for developer to eye ball right property) ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}. + * + * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket + * names containing dots are supported; properties are matched by longest suffix first. + * + * <p>Immutable and thread-safe after construction. + */ +@Internal +final class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + /** + * Known bucket-level properties, sorted by descending length so that the longest match wins. + */ + private static final String[] KNOWN_PROPERTIES = + new String[] { + "assume-role.session-duration", + "assume-role.session-name", + "assume-role.external-id", + "credentials.provider", + "path-style-access", + "sse.kms-key-id", + "assume-role.arn", + "access-key", + "secret-key", + "sse.type", + "endpoint", + "region" + }; + + private final Map<String, S3BucketConfig> bucketConfigs; + + BucketConfigProvider(Configuration flinkConfig) { + this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig)); + } + + @Nullable + S3BucketConfig getBucketConfig(String bucketName) { + return bucketConfigs.get(bucketName); + } + + @VisibleForTesting + boolean hasBucketConfig(String bucketName) { + return bucketConfigs.containsKey(bucketName); + } + + @VisibleForTesting + int size() { + return bucketConfigs.size(); + } + + private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) { + Map<String, Map<String, String>> rawConfigs = new HashMap<>(); + + for (String key : flinkConfig.keySet()) { + if (!key.startsWith(BUCKET_CONFIG_PREFIX)) { + continue; + } + String suffix = key.substring(BUCKET_CONFIG_PREFIX.length()); + String value = flinkConfig.getString(key, null); + if (value == null) { + continue; + } + + for (String prop : KNOWN_PROPERTIES) { + if (suffix.endsWith("." + prop)) { + String bucketName = suffix.substring(0, suffix.length() - prop.length() - 1); + if (!bucketName.isEmpty()) { + rawConfigs + .computeIfAbsent(bucketName, k -> new HashMap<>()) + .put(prop, value); + } + break; + } + } + } + + Map<String, S3BucketConfig> result = new HashMap<>(); + for (Map.Entry<String, Map<String, String>> entry : rawConfigs.entrySet()) { + String bucketName = entry.getKey(); + Map<String, String> props = entry.getValue(); + + S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props); + if (bucketConfig.hasAnyOverride()) { + result.put(bucketName, bucketConfig); + LOG.info( + "Registered bucket-specific configuration for bucket '{}': {}", + bucketName, + bucketConfig); + } + } + + return result; + } + + private static S3BucketConfig buildBucketConfig(String bucketName, Map<String, String> props) { + S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName); + + applyIfPresent(props, "region", builder::region); Review Comment: This is very error prone, it should rather be done in a loop over KNOWN_PROPERTIES. Otherwise you can add another known property but forget to apply it here. For properties that need exceptional handling, you could add branching in the loop itself. But if default is to "applyIfPresent" it should be respected. I suggest you add a map from property name to the builder method and assert in tests that it's size matches number of known properties ########## flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link BucketConfigProvider}. */ +class BucketConfigProviderTest { + + @Test + void testParsesSingleBucketConfig() { + Configuration config = new Configuration(); + config.setString("s3.bucket.my-bucket.endpoint", "https://s3.us-west-2.amazonaws.com"); + config.setString("s3.bucket.my-bucket.path-style-access", "true"); + config.setString("s3.bucket.my-bucket.region", "us-west-2"); Review Comment: We should test for other partitions too. At least for `aws-cn` ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}. + * + * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket + * names containing dots are supported; properties are matched by longest suffix first. + * + * <p>Immutable and thread-safe after construction. + */ +@Internal +final class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + /** + * Known bucket-level properties, sorted by descending length so that the longest match wins. + */ + private static final String[] KNOWN_PROPERTIES = + new String[] { + "assume-role.session-duration", + "assume-role.session-name", + "assume-role.external-id", + "credentials.provider", + "path-style-access", + "sse.kms-key-id", + "assume-role.arn", + "access-key", + "secret-key", + "sse.type", + "endpoint", + "region" + }; + + private final Map<String, S3BucketConfig> bucketConfigs; + + BucketConfigProvider(Configuration flinkConfig) { + this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig)); + } + + @Nullable + S3BucketConfig getBucketConfig(String bucketName) { + return bucketConfigs.get(bucketName); + } + + @VisibleForTesting + boolean hasBucketConfig(String bucketName) { + return bucketConfigs.containsKey(bucketName); + } + + @VisibleForTesting + int size() { + return bucketConfigs.size(); + } + + private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) { + Map<String, Map<String, String>> rawConfigs = new HashMap<>(); + + for (String key : flinkConfig.keySet()) { + if (!key.startsWith(BUCKET_CONFIG_PREFIX)) { Review Comment: Just wondering if someone adds another config with `s3.bucket` for anything else than this bucket config. Will it be interpreted as a bucket name? (and probably skipped assuming that none of other properties match) ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}. + * + * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket + * names containing dots are supported; properties are matched by longest suffix first. + * + * <p>Immutable and thread-safe after construction. + */ +@Internal +final class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + /** + * Known bucket-level properties, sorted by descending length so that the longest match wins. + */ + private static final String[] KNOWN_PROPERTIES = + new String[] { + "assume-role.session-duration", + "assume-role.session-name", + "assume-role.external-id", + "credentials.provider", + "path-style-access", + "sse.kms-key-id", + "assume-role.arn", + "access-key", + "secret-key", + "sse.type", + "endpoint", + "region" + }; + + private final Map<String, S3BucketConfig> bucketConfigs; + + BucketConfigProvider(Configuration flinkConfig) { + this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig)); + } + + @Nullable + S3BucketConfig getBucketConfig(String bucketName) { + return bucketConfigs.get(bucketName); + } + + @VisibleForTesting + boolean hasBucketConfig(String bucketName) { + return bucketConfigs.containsKey(bucketName); + } + + @VisibleForTesting + int size() { + return bucketConfigs.size(); + } + + private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) { + Map<String, Map<String, String>> rawConfigs = new HashMap<>(); + + for (String key : flinkConfig.keySet()) { + if (!key.startsWith(BUCKET_CONFIG_PREFIX)) { + continue; + } + String suffix = key.substring(BUCKET_CONFIG_PREFIX.length()); + String value = flinkConfig.getString(key, null); + if (value == null) { + continue; + } + + for (String prop : KNOWN_PROPERTIES) { + if (suffix.endsWith("." + prop)) { + String bucketName = suffix.substring(0, suffix.length() - prop.length() - 1); + if (!bucketName.isEmpty()) { + rawConfigs + .computeIfAbsent(bucketName, k -> new HashMap<>()) + .put(prop, value); + } + break; + } + } + } + + Map<String, S3BucketConfig> result = new HashMap<>(); + for (Map.Entry<String, Map<String, String>> entry : rawConfigs.entrySet()) { + String bucketName = entry.getKey(); + Map<String, String> props = entry.getValue(); + + S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props); + if (bucketConfig.hasAnyOverride()) { + result.put(bucketName, bucketConfig); + LOG.info( + "Registered bucket-specific configuration for bucket '{}': {}", + bucketName, + bucketConfig); + } + } + + return result; + } + + private static S3BucketConfig buildBucketConfig(String bucketName, Map<String, String> props) { + S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName); + + applyIfPresent(props, "region", builder::region); + applyIfPresent(props, "endpoint", builder::endpoint); + applyIfPresent(props, "access-key", builder::accessKey); + applyIfPresent(props, "secret-key", builder::secretKey); + applyIfPresent(props, "sse.type", builder::sseType); + applyIfPresent(props, "sse.kms-key-id", builder::sseKmsKeyId); + applyIfPresent(props, "assume-role.arn", builder::assumeRoleArn); + applyIfPresent(props, "assume-role.external-id", builder::assumeRoleExternalId); + applyIfPresent(props, "assume-role.session-name", builder::assumeRoleSessionName); + applyIfPresent(props, "credentials.provider", builder::credentialsProvider); + + String pathStyleStr = props.get("path-style-access"); + if (pathStyleStr != null) { + builder.pathStyleAccess(Boolean.parseBoolean(pathStyleStr)); + } + + String durationStr = props.get("assume-role.session-duration"); + if (durationStr != null) { + try { + builder.assumeRoleSessionDurationSeconds(Integer.parseInt(durationStr)); Review Comment: Can we configure this property as Duration or are we limited by prefix matching where it has to be provided as int converted to string? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}. + * + * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket + * names containing dots are supported; properties are matched by longest suffix first. + * + * <p>Immutable and thread-safe after construction. + */ +@Internal +final class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + /** + * Known bucket-level properties, sorted by descending length so that the longest match wins. + */ + private static final String[] KNOWN_PROPERTIES = + new String[] { + "assume-role.session-duration", + "assume-role.session-name", + "assume-role.external-id", + "credentials.provider", + "path-style-access", + "sse.kms-key-id", + "assume-role.arn", + "access-key", + "secret-key", + "sse.type", + "endpoint", + "region" + }; + + private final Map<String, S3BucketConfig> bucketConfigs; + + BucketConfigProvider(Configuration flinkConfig) { + this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig)); + } + + @Nullable + S3BucketConfig getBucketConfig(String bucketName) { + return bucketConfigs.get(bucketName); + } + + @VisibleForTesting + boolean hasBucketConfig(String bucketName) { + return bucketConfigs.containsKey(bucketName); + } + + @VisibleForTesting + int size() { + return bucketConfigs.size(); + } + + private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) { + Map<String, Map<String, String>> rawConfigs = new HashMap<>(); + + for (String key : flinkConfig.keySet()) { + if (!key.startsWith(BUCKET_CONFIG_PREFIX)) { + continue; + } + String suffix = key.substring(BUCKET_CONFIG_PREFIX.length()); + String value = flinkConfig.getString(key, null); + if (value == null) { + continue; + } + + for (String prop : KNOWN_PROPERTIES) { + if (suffix.endsWith("." + prop)) { + String bucketName = suffix.substring(0, suffix.length() - prop.length() - 1); + if (!bucketName.isEmpty()) { Review Comment: Should we check that bucket name consists of valid characters? Also does it mean that we disallow configuring properties for "all" buckets? Do we need explicitly configure each property for every bucket? Another question. Is it generally a legit situation to have empty bucket name? Should we log a warning to highlight the mistake if not? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java: ########## @@ -247,13 +267,54 @@ public FileSystem create(URI fsUri) throws IOException { String region = config.get(REGION); String endpoint = config.get(ENDPOINT); boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS); - Review Comment: Why do we set access key/secret key at the bucket level, but don't set at the global level? Is it explicit opt-out? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}. + * + * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket + * names containing dots are supported; properties are matched by longest suffix first. + * + * <p>Immutable and thread-safe after construction. + */ +@Internal +final class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + /** + * Known bucket-level properties, sorted by descending length so that the longest match wins. + */ + private static final String[] KNOWN_PROPERTIES = + new String[] { + "assume-role.session-duration", + "assume-role.session-name", + "assume-role.external-id", + "credentials.provider", + "path-style-access", + "sse.kms-key-id", + "assume-role.arn", + "access-key", + "secret-key", + "sse.type", + "endpoint", + "region" + }; + + private final Map<String, S3BucketConfig> bucketConfigs; + + BucketConfigProvider(Configuration flinkConfig) { + this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig)); + } + + @Nullable + S3BucketConfig getBucketConfig(String bucketName) { + return bucketConfigs.get(bucketName); + } + + @VisibleForTesting + boolean hasBucketConfig(String bucketName) { + return bucketConfigs.containsKey(bucketName); + } + + @VisibleForTesting + int size() { + return bucketConfigs.size(); + } + + private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) { + Map<String, Map<String, String>> rawConfigs = new HashMap<>(); + + for (String key : flinkConfig.keySet()) { + if (!key.startsWith(BUCKET_CONFIG_PREFIX)) { + continue; + } + String suffix = key.substring(BUCKET_CONFIG_PREFIX.length()); + String value = flinkConfig.getString(key, null); + if (value == null) { + continue; + } + + for (String prop : KNOWN_PROPERTIES) { + if (suffix.endsWith("." + prop)) { + String bucketName = suffix.substring(0, suffix.length() - prop.length() - 1); + if (!bucketName.isEmpty()) { + rawConfigs + .computeIfAbsent(bucketName, k -> new HashMap<>()) + .put(prop, value); + } + break; + } + } + } + + Map<String, S3BucketConfig> result = new HashMap<>(); + for (Map.Entry<String, Map<String, String>> entry : rawConfigs.entrySet()) { + String bucketName = entry.getKey(); + Map<String, String> props = entry.getValue(); + + S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props); + if (bucketConfig.hasAnyOverride()) { + result.put(bucketName, bucketConfig); + LOG.info( + "Registered bucket-specific configuration for bucket '{}': {}", + bucketName, + bucketConfig); + } + } + + return result; + } + + private static S3BucketConfig buildBucketConfig(String bucketName, Map<String, String> props) { + S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName); + + applyIfPresent(props, "region", builder::region); + applyIfPresent(props, "endpoint", builder::endpoint); + applyIfPresent(props, "access-key", builder::accessKey); + applyIfPresent(props, "secret-key", builder::secretKey); + applyIfPresent(props, "sse.type", builder::sseType); + applyIfPresent(props, "sse.kms-key-id", builder::sseKmsKeyId); + applyIfPresent(props, "assume-role.arn", builder::assumeRoleArn); + applyIfPresent(props, "assume-role.external-id", builder::assumeRoleExternalId); + applyIfPresent(props, "assume-role.session-name", builder::assumeRoleSessionName); + applyIfPresent(props, "credentials.provider", builder::credentialsProvider); + + String pathStyleStr = props.get("path-style-access"); + if (pathStyleStr != null) { + builder.pathStyleAccess(Boolean.parseBoolean(pathStyleStr)); + } + + String durationStr = props.get("assume-role.session-duration"); + if (durationStr != null) { + try { + builder.assumeRoleSessionDurationSeconds(Integer.parseInt(durationStr)); + } catch (NumberFormatException e) { + throw new IllegalConfigurationException( Review Comment: Why do we extract property validation only for this property and not for the rest? We never validate that arn looks like arn or kms key id looks correct. Also we don't validate that both access-key and secret-key have been provided, what if only one of them was? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java: ########## @@ -218,13 +232,13 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { + "static credentials (if configured) -> DefaultCredentialsProvider."); private Configuration flinkConfig; Review Comment: Why this is not volatile similarly to bucketConfigProvider? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java: ########## @@ -164,14 +180,12 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { + "Example: 'arn:aws:kms:us-east-1:123456789:key/12345678-1234-1234-1234-123456789abc' " + "or 'alias/my-s3-key'"); - // IAM Assume Role Configuration public static final ConfigOption<String> ASSUME_ROLE_ARN = ConfigOptions.key("s3.assume-role.arn") .stringType() .noDefaultValue() .withDescription( "ARN of the IAM role to assume for S3 access. " - + "Enables cross-account access or temporary elevated permissions. " + "Example: 'arn:aws:iam::123456789012:role/S3AccessRole'"); Review Comment: I would also add explicit example for some other partition, e.g. aws-cn ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}. + * + * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket + * names containing dots are supported; properties are matched by longest suffix first. + * + * <p>Immutable and thread-safe after construction. + */ +@Internal +final class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + /** + * Known bucket-level properties, sorted by descending length so that the longest match wins. + */ + private static final String[] KNOWN_PROPERTIES = + new String[] { + "assume-role.session-duration", + "assume-role.session-name", + "assume-role.external-id", + "credentials.provider", + "path-style-access", + "sse.kms-key-id", + "assume-role.arn", + "access-key", + "secret-key", + "sse.type", + "endpoint", + "region" + }; + + private final Map<String, S3BucketConfig> bucketConfigs; + + BucketConfigProvider(Configuration flinkConfig) { + this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig)); + } + + @Nullable + S3BucketConfig getBucketConfig(String bucketName) { + return bucketConfigs.get(bucketName); + } + + @VisibleForTesting + boolean hasBucketConfig(String bucketName) { + return bucketConfigs.containsKey(bucketName); + } + + @VisibleForTesting + int size() { + return bucketConfigs.size(); + } + + private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) { + Map<String, Map<String, String>> rawConfigs = new HashMap<>(); + + for (String key : flinkConfig.keySet()) { + if (!key.startsWith(BUCKET_CONFIG_PREFIX)) { + continue; + } + String suffix = key.substring(BUCKET_CONFIG_PREFIX.length()); + String value = flinkConfig.getString(key, null); + if (value == null) { + continue; + } + + for (String prop : KNOWN_PROPERTIES) { + if (suffix.endsWith("." + prop)) { + String bucketName = suffix.substring(0, suffix.length() - prop.length() - 1); + if (!bucketName.isEmpty()) { + rawConfigs + .computeIfAbsent(bucketName, k -> new HashMap<>()) + .put(prop, value); + } + break; + } + } + } + + Map<String, S3BucketConfig> result = new HashMap<>(); + for (Map.Entry<String, Map<String, String>> entry : rawConfigs.entrySet()) { + String bucketName = entry.getKey(); + Map<String, String> props = entry.getValue(); + + S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props); + if (bucketConfig.hasAnyOverride()) { + result.put(bucketName, bucketConfig); + LOG.info( Review Comment: Noting, that we don't log credentials here, because toString is overridden in bucketConfig. ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java: ########## @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.IllegalConfigurationException; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** + * Immutable, bucket-specific S3 configuration overrides. + * + * <p>Null values indicate inheritance from global configuration. Only explicitly configured values + * are non-null. Configuration format: {@code s3.bucket.<bucket-name>.<property>} + * + * <p>Validates that credentials (access-key/secret-key) are either both set or both absent. + */ +@Internal +final class S3BucketConfig { + + private final String bucketName; + @Nullable private final String region; + @Nullable private final String endpoint; + @Nullable private final Boolean pathStyleAccess; + @Nullable private final String accessKey; + @Nullable private final String secretKey; + @Nullable private final String sseType; + @Nullable private final String sseKmsKeyId; + @Nullable private final String assumeRoleArn; + @Nullable private final String assumeRoleExternalId; + @Nullable private final String assumeRoleSessionName; + @Nullable private final Integer assumeRoleSessionDurationSeconds; + @Nullable private final String credentialsProvider; + + private S3BucketConfig(Builder builder) { + this.bucketName = builder.bucketName; + this.region = builder.region; + this.endpoint = builder.endpoint; + this.pathStyleAccess = builder.pathStyleAccess; + this.accessKey = builder.accessKey; + this.secretKey = builder.secretKey; + this.sseType = builder.sseType; + this.sseKmsKeyId = builder.sseKmsKeyId; + this.assumeRoleArn = builder.assumeRoleArn; + this.assumeRoleExternalId = builder.assumeRoleExternalId; + this.assumeRoleSessionName = builder.assumeRoleSessionName; + this.assumeRoleSessionDurationSeconds = builder.assumeRoleSessionDurationSeconds; + this.credentialsProvider = builder.credentialsProvider; + } + + String getBucketName() { + return bucketName; + } + + @Nullable + String getRegion() { + return region; + } + + @Nullable + String getEndpoint() { + return endpoint; + } + + @Nullable + Boolean getPathStyleAccess() { + return pathStyleAccess; + } + + @Nullable + String getAccessKey() { + return accessKey; + } + + @Nullable + String getSecretKey() { + return secretKey; + } + + @Nullable + String getSseType() { + return sseType; + } + + @Nullable + String getSseKmsKeyId() { + return sseKmsKeyId; + } + + @Nullable + String getAssumeRoleArn() { + return assumeRoleArn; + } + + @Nullable + String getAssumeRoleExternalId() { + return assumeRoleExternalId; + } + + @Nullable + String getAssumeRoleSessionName() { + return assumeRoleSessionName; + } + + @Nullable + Integer getAssumeRoleSessionDurationSeconds() { + return assumeRoleSessionDurationSeconds; + } + + @Nullable + String getCredentialsProvider() { + return credentialsProvider; + } + + boolean hasAnyOverride() { + return region != null + || endpoint != null + || pathStyleAccess != null + || accessKey != null + || secretKey != null + || sseType != null + || sseKmsKeyId != null + || assumeRoleArn != null + || assumeRoleExternalId != null + || assumeRoleSessionName != null + || assumeRoleSessionDurationSeconds != null + || credentialsProvider != null; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + S3BucketConfig that = (S3BucketConfig) o; + return Objects.equals(bucketName, that.bucketName) + && Objects.equals(region, that.region) + && Objects.equals(endpoint, that.endpoint) + && Objects.equals(pathStyleAccess, that.pathStyleAccess) + && Objects.equals(accessKey, that.accessKey) + && Objects.equals(secretKey, that.secretKey) + && Objects.equals(sseType, that.sseType) + && Objects.equals(sseKmsKeyId, that.sseKmsKeyId) + && Objects.equals(assumeRoleArn, that.assumeRoleArn) + && Objects.equals(assumeRoleExternalId, that.assumeRoleExternalId) + && Objects.equals(assumeRoleSessionName, that.assumeRoleSessionName) + && Objects.equals( + assumeRoleSessionDurationSeconds, that.assumeRoleSessionDurationSeconds) + && Objects.equals(credentialsProvider, that.credentialsProvider); + } + + @Override + public int hashCode() { + return Objects.hash( + bucketName, + region, + endpoint, + pathStyleAccess, + accessKey, + secretKey, + sseType, + sseKmsKeyId, + assumeRoleArn, + assumeRoleExternalId, + assumeRoleSessionName, + assumeRoleSessionDurationSeconds, + credentialsProvider); + } + + @Override + public String toString() { Review Comment: !nit, quotes handling here is a bit messy. You can have unclosed quotes if not all configurations are provided ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java: ########## @@ -233,6 +247,12 @@ public int getPriority() { @Override public void configure(Configuration config) { this.flinkConfig = config; + this.bucketConfigProvider = new BucketConfigProvider(config); + if (bucketConfigProvider.size() > 0) { + LOG.info( Review Comment: Does it bring value on top of actually logging all configurations in the parser? ########## flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Parses bucket-specific S3 configuration using format {@code s3.bucket.<bucket-name>.<property>}. + * + * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and IAM roles. Bucket + * names containing dots are supported; properties are matched by longest suffix first. + * + * <p>Immutable and thread-safe after construction. + */ +@Internal +final class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + /** + * Known bucket-level properties, sorted by descending length so that the longest match wins. + */ + private static final String[] KNOWN_PROPERTIES = + new String[] { + "assume-role.session-duration", + "assume-role.session-name", + "assume-role.external-id", + "credentials.provider", + "path-style-access", + "sse.kms-key-id", + "assume-role.arn", + "access-key", + "secret-key", + "sse.type", + "endpoint", + "region" + }; + + private final Map<String, S3BucketConfig> bucketConfigs; + + BucketConfigProvider(Configuration flinkConfig) { + this.bucketConfigs = Collections.unmodifiableMap(parseBucketConfigs(flinkConfig)); + } + + @Nullable + S3BucketConfig getBucketConfig(String bucketName) { + return bucketConfigs.get(bucketName); + } + + @VisibleForTesting + boolean hasBucketConfig(String bucketName) { + return bucketConfigs.containsKey(bucketName); + } + + @VisibleForTesting + int size() { + return bucketConfigs.size(); + } + + private static Map<String, S3BucketConfig> parseBucketConfigs(Configuration flinkConfig) { + Map<String, Map<String, String>> rawConfigs = new HashMap<>(); + + for (String key : flinkConfig.keySet()) { + if (!key.startsWith(BUCKET_CONFIG_PREFIX)) { + continue; + } + String suffix = key.substring(BUCKET_CONFIG_PREFIX.length()); + String value = flinkConfig.getString(key, null); + if (value == null) { + continue; + } + + for (String prop : KNOWN_PROPERTIES) { + if (suffix.endsWith("." + prop)) { + String bucketName = suffix.substring(0, suffix.length() - prop.length() - 1); + if (!bucketName.isEmpty()) { + rawConfigs + .computeIfAbsent(bucketName, k -> new HashMap<>()) + .put(prop, value); + } + break; + } + } + } + + Map<String, S3BucketConfig> result = new HashMap<>(); + for (Map.Entry<String, Map<String, String>> entry : rawConfigs.entrySet()) { + String bucketName = entry.getKey(); + Map<String, String> props = entry.getValue(); + + S3BucketConfig bucketConfig = buildBucketConfig(bucketName, props); + if (bucketConfig.hasAnyOverride()) { + result.put(bucketName, bucketConfig); + LOG.info( + "Registered bucket-specific configuration for bucket '{}': {}", + bucketName, + bucketConfig); + } + } + + return result; + } + + private static S3BucketConfig buildBucketConfig(String bucketName, Map<String, String> props) { + S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName); + + applyIfPresent(props, "region", builder::region); + applyIfPresent(props, "endpoint", builder::endpoint); + applyIfPresent(props, "access-key", builder::accessKey); + applyIfPresent(props, "secret-key", builder::secretKey); Review Comment: Hmm, could you please double-check that this is covered by GlobalConfiguration.isSensitive()? We must ensure that at least FlinkConfigMap response on JM is not going to show secrets in plaintext in flink UI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
