This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 92886f2e975 [FLINK-39166][s3] Add bucket-level configuration support
to Native S3 FileSystem
92886f2e975 is described below
commit 92886f2e975fe28709c94d141fcff936a50ba079
Author: Samrat <[email protected]>
AuthorDate: Sat May 16 13:38:08 2026 +0530
[FLINK-39166][s3] Add bucket-level configuration support to Native S3
FileSystem
---
flink-filesystems/flink-s3-fs-native/README.md | 25 ++
.../flink/fs/s3native/BucketConfigProvider.java | 202 ++++++++++++
.../fs/s3native/NativeS3FileSystemFactory.java | 77 ++++-
.../apache/flink/fs/s3native/S3BucketConfig.java | 342 +++++++++++++++++++++
.../apache/flink/fs/s3native/S3ClientProvider.java | 63 +++-
.../fs/s3native/BucketConfigProviderTest.java | 259 ++++++++++++++++
.../fs/s3native/NativeS3FileSystemFactoryTest.java | 188 ++++++++++-
.../flink/fs/s3native/S3BucketConfigTest.java | 170 ++++++++++
8 files changed, 1299 insertions(+), 27 deletions(-)
diff --git a/flink-filesystems/flink-s3-fs-native/README.md
b/flink-filesystems/flink-s3-fs-native/README.md
index 6dcf2c74ff0..df9f58823fb 100644
--- a/flink-filesystems/flink-s3-fs-native/README.md
+++ b/flink-filesystems/flink-s3-fs-native/README.md
@@ -90,6 +90,31 @@ input.sinkTo(FileSink.forRowFormat(new
Path("s3://my-bucket/output"),
| s3.assume-role.session-name | flink-s3-session | Session name for the
assumed role |
| s3.assume-role.session-duration | 3600 | Session duration in seconds
(900-43200) |
+## Bucket-Level Configuration
+
+The Native S3 FileSystem supports per-bucket configuration overrides, allowing
different S3 buckets to use different connection settings within the same Flink
cluster. This enables scenarios like:
+
+- **Different credentials per bucket** (e.g., cross-account access for a data
sink bucket)
+- **Different regions or endpoints** (e.g., checkpoints in `us-east-1`,
archive bucket in `eu-west-1`)
+- **Bucket-specific encryption** (e.g., SSE-KMS for sensitive data, SSE-S3 for
logs)
+
+### Format
+
+Bucket-level configuration uses the format:
`s3.bucket.<bucket-name>.<property>`
+
+Bucket names containing dots (e.g., `my.company.data`) are fully supported
through longest-suffix matching.
+
+> **Note:** AWS recommends avoiding periods (`.`) in bucket names. Buckets
with dots cannot use virtual-hosted-style addressing over HTTPS without custom
certificate validation. If you use dotted bucket names, enable
`path-style-access` for that bucket (see [AWS
documentation](https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html)).
+
+### Supported Properties
+
+All global S3 configuration properties can be overridden at the bucket level:
+
+- **Connection:** `region`, `endpoint`, `path-style-access`
+- **Credentials:** `access-key`, `secret-key`, `aws.credentials.provider`
+- **Encryption:** `sse.type`, `sse.kms.key-id`
+- **IAM Assume Role:** `assume-role.arn`, `assume-role.external-id`,
`assume-role.session-name`, `assume-role.session-duration`
+
## Server-Side Encryption (SSE)
The filesystem supports server-side encryption for data at rest:
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java
new file mode 100644
index 00000000000..05355f8d4a0
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+/**
+ * Parses bucket-specific S3 configuration using format {@code
s3.bucket.<bucket-name>.<property>}.
+ *
+ * <p>Enables per-bucket overrides for endpoints, credentials, encryption, and
IAM roles. Bucket
+ * names containing dots are supported; properties are matched by longest
suffix first.
+ *
+ * <p>Immutable and thread-safe after construction.
+ */
+@Internal
+final class BucketConfigProvider {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BucketConfigProvider.class);
+ static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
+ static final Map<String, BiConsumer<S3BucketConfig.Builder, String>>
PROPERTY_APPLICATORS;
+ static final List<String> KNOWN_PROPERTIES_BY_LENGTH;
+
+ static {
+ final Map<String, BiConsumer<S3BucketConfig.Builder, String>>
applicators =
+ new LinkedHashMap<>();
+ applicators.put("access-key", S3BucketConfig.Builder::accessKey);
+ applicators.put("assume-role.arn",
S3BucketConfig.Builder::assumeRoleArn);
+ applicators.put("assume-role.external-id",
S3BucketConfig.Builder::assumeRoleExternalId);
+ applicators.put(
+ "assume-role.session-duration",
+ (b, v) -> {
+ try {
+
b.assumeRoleSessionDurationSeconds(Integer.parseInt(v));
+ } catch (NumberFormatException e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid assume-role.session-duration
'%s' for bucket '%s'. "
+ + "Must be a valid integer
(e.g., 3600)",
+ v, b.getBucketName()),
+ e);
+ }
+ });
+ applicators.put("assume-role.session-name",
S3BucketConfig.Builder::assumeRoleSessionName);
+ applicators.put("aws.credentials.provider",
S3BucketConfig.Builder::credentialsProvider);
+ applicators.put("endpoint", S3BucketConfig.Builder::endpoint);
+ applicators.put(
+ "path-style-access",
+ (b, v) -> {
+ if (!"true".equalsIgnoreCase(v) &&
!"false".equalsIgnoreCase(v)) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid path-style-access '%s' for
bucket '%s'. "
+ + "Must be 'true' or 'false'",
+ v, b.getBucketName()));
+ }
+ b.pathStyleAccess(Boolean.parseBoolean(v));
+ });
+ applicators.put("region", S3BucketConfig.Builder::region);
+ applicators.put("secret-key", S3BucketConfig.Builder::secretKey);
+ applicators.put("sse.kms.key-id", S3BucketConfig.Builder::sseKmsKeyId);
+ applicators.put("sse.type", S3BucketConfig.Builder::sseType);
+ PROPERTY_APPLICATORS = Collections.unmodifiableMap(applicators);
+
+ KNOWN_PROPERTIES_BY_LENGTH =
+ applicators.keySet().stream()
+
.sorted(Comparator.comparingInt(String::length).reversed())
+ .collect(Collectors.toList());
+ }
+
+ private final Map<String, S3BucketConfig> bucketConfigs;
+
+ BucketConfigProvider(Configuration flinkConfig) {
+ this.bucketConfigs =
Collections.unmodifiableMap(parseBucketConfigs(flinkConfig));
+ }
+
+ @Nullable
+ S3BucketConfig getBucketConfig(String bucketName) {
+ return bucketConfigs.get(bucketName);
+ }
+
+ @VisibleForTesting
+ boolean hasBucketConfig(String bucketName) {
+ return bucketConfigs.containsKey(bucketName);
+ }
+
+ @VisibleForTesting
+ int size() {
+ return bucketConfigs.size();
+ }
+
+ private static Map<String, S3BucketConfig>
parseBucketConfigs(Configuration flinkConfig) {
+ final Map<String, Map<String, String>> rawConfigs = new HashMap<>();
+
+ for (final String key : flinkConfig.keySet()) {
+ if (!key.startsWith(BUCKET_CONFIG_PREFIX)) {
+ continue;
+ }
+ final String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
+ if (StringUtils.isNullOrWhitespaceOnly(suffix)) {
+ continue;
+ }
+ final String value = flinkConfig.getString(key, null);
+ if (StringUtils.isNullOrWhitespaceOnly(value)) {
+ continue;
+ }
+
+ boolean matched = false;
+ for (final String prop : KNOWN_PROPERTIES_BY_LENGTH) {
+ if (suffix.endsWith("." + prop)) {
+ final String bucketName =
+ suffix.substring(0, suffix.length() -
prop.length() - 1);
+ if (StringUtils.isNullOrWhitespaceOnly(bucketName)) {
+ LOG.warn(
+ "Ignoring bucket config key '{}': "
+ + "resolved bucket name is empty
(missing bucket name between "
+ + "'s3.bucket.' prefix and '.{}'
property?).",
+ key,
+ prop);
+ } else {
+ rawConfigs
+ .computeIfAbsent(bucketName, k -> new
HashMap<>())
+ .put(prop, value);
+ }
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ LOG.warn(
+ "Ignoring unrecognized bucket config key '{}'. "
+ + "Known bucket-level properties: {}",
+ key,
+ PROPERTY_APPLICATORS.keySet());
+ }
+ }
+
+ final Map<String, S3BucketConfig> result = new HashMap<>();
+ for (final Map.Entry<String, Map<String, String>> entry :
rawConfigs.entrySet()) {
+ final String bucketName = entry.getKey();
+ final Map<String, String> props = entry.getValue();
+
+ final S3BucketConfig bucketConfig = buildBucketConfig(bucketName,
props);
+ if (bucketConfig.hasAnyOverride()) {
+ result.put(bucketName, bucketConfig);
+ LOG.info(
+ "Registered bucket-specific configuration for bucket
'{}': {}",
+ bucketName,
+ bucketConfig);
+ }
+ }
+
+ return result;
+ }
+
+ private static S3BucketConfig buildBucketConfig(String bucketName,
Map<String, String> props) {
+ final S3BucketConfig.Builder builder =
S3BucketConfig.builder(bucketName);
+
+ for (final Map.Entry<String, BiConsumer<S3BucketConfig.Builder,
String>> entry :
+ PROPERTY_APPLICATORS.entrySet()) {
+ final String value = props.get(entry.getKey());
+ if (value != null) {
+ entry.getValue().accept(builder, value);
+ }
+ }
+
+ return builder.build();
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
index c54ed86f8bf..50c0145be93 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java
@@ -27,6 +27,7 @@ import
org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,11 +41,14 @@ import java.util.Collections;
import java.util.Map;
/**
- * Factory for creating Native S3 FileSystem instances.
+ * Factory for creating Native S3 FileSystem instances using AWS SDK v2.
*
- * <p>This factory creates {@link NativeS3FileSystem} instances for accessing
Amazon S3 buckets
- * using AWS SDK v2. The Native S3 FileSystem provides a drop-in replacement
for Presto and Hadoop
- * S3 implementations with minimal external dependencies.
+ * <h3>Bucket-Level Configuration</h3>
+ *
+ * <p>Supports per-bucket configuration overrides using the format {@code
+ * s3.bucket.<bucket-name>.<property>}. Bucket-level settings override global
settings; unset
+ * properties inherit global values. See {@link
BucketConfigProvider#PROPERTY_APPLICATORS} for the
+ * complete list of supported bucket-level properties.
*
* @see NativeS3FileSystem
* @see org.apache.flink.core.fs.FileSystemFactory
@@ -295,6 +299,7 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
+ "static credentials (if configured) ->
DefaultCredentialsProvider.");
@Nullable private Configuration flinkConfig;
+ @Nullable private BucketConfigProvider bucketConfigProvider;
@Override
public String getScheme() {
@@ -310,6 +315,7 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
@Override
public void configure(Configuration config) {
this.flinkConfig = config;
+ this.bucketConfigProvider = new BucketConfigProvider(config);
}
@Override
@@ -324,13 +330,56 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
String region = config.get(REGION);
String endpoint = config.get(ENDPOINT);
boolean pathStyleAccess = config.get(PATH_STYLE_ACCESS);
+ String sseType = config.get(SSE_TYPE);
+ String sseKmsKeyId = config.get(SSE_KMS_KEY_ID);
+ String assumeRoleArn = config.get(ASSUME_ROLE_ARN);
+ String assumeRoleExternalId = config.get(ASSUME_ROLE_EXTERNAL_ID);
+ String assumeRoleSessionName = config.get(ASSUME_ROLE_SESSION_NAME);
+ int assumeRoleSessionDuration =
config.get(ASSUME_ROLE_SESSION_DURATION_SECONDS);
+ String credentialsProviderClasses =
config.get(AWS_CREDENTIALS_PROVIDER);
+
+ // Apply bucket-specific overrides
+ String bucketName = fsUri.getHost();
+ if (StringUtils.isNullOrWhitespaceOnly(bucketName)) {
+ throw new IOException("Invalid S3 URI: missing or empty bucket
name in URI: " + fsUri);
+ }
+ if (bucketConfigProvider != null) {
+ S3BucketConfig overrides =
bucketConfigProvider.getBucketConfig(bucketName);
+ if (overrides != null) {
+ LOG.debug(
+ "Applying bucket-specific configuration for bucket
'{}': {}",
+ bucketName,
+ overrides);
+ accessKey = firstNonNull(overrides.getAccessKey(), accessKey);
+ secretKey = firstNonNull(overrides.getSecretKey(), secretKey);
+ region = firstNonNull(overrides.getRegion(), region);
+ endpoint = firstNonNull(overrides.getEndpoint(), endpoint);
+ sseType = firstNonNull(overrides.getSseType(), sseType);
+ sseKmsKeyId = firstNonNull(overrides.getSseKmsKeyId(),
sseKmsKeyId);
+ assumeRoleArn = firstNonNull(overrides.getAssumeRoleArn(),
assumeRoleArn);
+ assumeRoleExternalId =
+ firstNonNull(overrides.getAssumeRoleExternalId(),
assumeRoleExternalId);
+ assumeRoleSessionName =
+ firstNonNull(overrides.getAssumeRoleSessionName(),
assumeRoleSessionName);
+ credentialsProviderClasses =
+ firstNonNull(
+ overrides.getCredentialsProvider(),
credentialsProviderClasses);
+ if (overrides.getPathStyleAccess() != null) {
+ pathStyleAccess = overrides.getPathStyleAccess();
+ }
+ if (overrides.getAssumeRoleSessionDurationSeconds() != null) {
+ assumeRoleSessionDuration =
overrides.getAssumeRoleSessionDurationSeconds();
+ }
+ }
+ }
S3EncryptionConfig encryptionConfig =
S3EncryptionConfig.fromConfig(
- config.get(SSE_TYPE),
- config.get(SSE_KMS_KEY_ID),
+ sseType,
+ sseKmsKeyId,
config.getOptional(SSE_KMS_ENCRYPTION_CONTEXT)
.orElse(Collections.emptyMap()));
+
String entropyInjectionKey = config.get(ENTROPY_INJECT_KEY_OPTION);
int numEntropyChars = -1;
if (entropyInjectionKey != null) {
@@ -404,13 +453,12 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
.socketTimeout(config.get(SOCKET_TIMEOUT))
.connectionMaxIdleTime(config.get(CONNECTION_MAX_IDLE_TIME))
.clientCloseTimeout(config.get(CLIENT_CLOSE_TIMEOUT))
- .assumeRoleArn(config.get(ASSUME_ROLE_ARN))
-
.assumeRoleExternalId(config.get(ASSUME_ROLE_EXTERNAL_ID))
-
.assumeRoleSessionName(config.get(ASSUME_ROLE_SESSION_NAME))
- .assumeRoleSessionDurationSeconds(
-
config.get(ASSUME_ROLE_SESSION_DURATION_SECONDS))
+ .assumeRoleArn(assumeRoleArn)
+ .assumeRoleExternalId(assumeRoleExternalId)
+ .assumeRoleSessionName(assumeRoleSessionName)
+
.assumeRoleSessionDurationSeconds(assumeRoleSessionDuration)
.maxRetries(config.get(MAX_RETRIES))
-
.credentialsProviderClasses(config.get(AWS_CREDENTIALS_PROVIDER))
+ .credentialsProviderClasses(credentialsProviderClasses)
.encryptionConfig(encryptionConfig)
.build();
@@ -442,4 +490,9 @@ public class NativeS3FileSystemFactory implements
FileSystemFactory {
readBufferSize,
config.get(FS_CLOSE_TIMEOUT));
}
+
+ @Nullable
+ private static <T> T firstNonNull(@Nullable T override, @Nullable T base) {
+ return override != null ? override : base;
+ }
}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java
new file mode 100644
index 00000000000..0625254cd33
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * Immutable, bucket-specific S3 configuration overrides.
+ *
+ * <p>Null values indicate inheritance from global configuration. Only
explicitly configured values
+ * are non-null. Configuration format: {@code
s3.bucket.<bucket-name>.<property>}
+ *
+ * <p>Validates that credentials (access-key/secret-key) are either both set
or both absent.
+ */
+@Internal
+final class S3BucketConfig {
+
+ private final String bucketName;
+ @Nullable private final String region;
+ @Nullable private final String endpoint;
+ @Nullable private final Boolean pathStyleAccess;
+ @Nullable private final String accessKey;
+ @Nullable private final String secretKey;
+ @Nullable private final String sseType;
+ @Nullable private final String sseKmsKeyId;
+ @Nullable private final String assumeRoleArn;
+ @Nullable private final String assumeRoleExternalId;
+ @Nullable private final String assumeRoleSessionName;
+ @Nullable private final Integer assumeRoleSessionDurationSeconds;
+ @Nullable private final String credentialsProvider;
+
+ private S3BucketConfig(Builder builder) {
+ this.bucketName = builder.bucketName;
+ this.region = builder.region;
+ this.endpoint = builder.endpoint;
+ this.pathStyleAccess = builder.pathStyleAccess;
+ this.accessKey = builder.accessKey;
+ this.secretKey = builder.secretKey;
+ this.sseType = builder.sseType;
+ this.sseKmsKeyId = builder.sseKmsKeyId;
+ this.assumeRoleArn = builder.assumeRoleArn;
+ this.assumeRoleExternalId = builder.assumeRoleExternalId;
+ this.assumeRoleSessionName = builder.assumeRoleSessionName;
+ this.assumeRoleSessionDurationSeconds =
builder.assumeRoleSessionDurationSeconds;
+ this.credentialsProvider = builder.credentialsProvider;
+ }
+
+ String getBucketName() {
+ return bucketName;
+ }
+
+ @Nullable
+ String getRegion() {
+ return region;
+ }
+
+ @Nullable
+ String getEndpoint() {
+ return endpoint;
+ }
+
+ @Nullable
+ Boolean getPathStyleAccess() {
+ return pathStyleAccess;
+ }
+
+ @Nullable
+ String getAccessKey() {
+ return accessKey;
+ }
+
+ @Nullable
+ String getSecretKey() {
+ return secretKey;
+ }
+
+ @Nullable
+ String getSseType() {
+ return sseType;
+ }
+
+ @Nullable
+ String getSseKmsKeyId() {
+ return sseKmsKeyId;
+ }
+
+ @Nullable
+ String getAssumeRoleArn() {
+ return assumeRoleArn;
+ }
+
+ @Nullable
+ String getAssumeRoleExternalId() {
+ return assumeRoleExternalId;
+ }
+
+ @Nullable
+ String getAssumeRoleSessionName() {
+ return assumeRoleSessionName;
+ }
+
+ @Nullable
+ Integer getAssumeRoleSessionDurationSeconds() {
+ return assumeRoleSessionDurationSeconds;
+ }
+
+ @Nullable
+ String getCredentialsProvider() {
+ return credentialsProvider;
+ }
+
+ boolean hasAnyOverride() {
+ return region != null
+ || endpoint != null
+ || pathStyleAccess != null
+ || accessKey != null
+ || secretKey != null
+ || sseType != null
+ || sseKmsKeyId != null
+ || assumeRoleArn != null
+ || assumeRoleExternalId != null
+ || assumeRoleSessionName != null
+ || assumeRoleSessionDurationSeconds != null
+ || credentialsProvider != null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ S3BucketConfig that = (S3BucketConfig) o;
+ return Objects.equals(bucketName, that.bucketName)
+ && Objects.equals(region, that.region)
+ && Objects.equals(endpoint, that.endpoint)
+ && Objects.equals(pathStyleAccess, that.pathStyleAccess)
+ && Objects.equals(accessKey, that.accessKey)
+ && Objects.equals(secretKey, that.secretKey)
+ && Objects.equals(sseType, that.sseType)
+ && Objects.equals(sseKmsKeyId, that.sseKmsKeyId)
+ && Objects.equals(assumeRoleArn, that.assumeRoleArn)
+ && Objects.equals(assumeRoleExternalId,
that.assumeRoleExternalId)
+ && Objects.equals(assumeRoleSessionName,
that.assumeRoleSessionName)
+ && Objects.equals(
+ assumeRoleSessionDurationSeconds,
that.assumeRoleSessionDurationSeconds)
+ && Objects.equals(credentialsProvider,
that.credentialsProvider);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ bucketName,
+ region,
+ endpoint,
+ pathStyleAccess,
+ accessKey,
+ secretKey,
+ sseType,
+ sseKmsKeyId,
+ assumeRoleArn,
+ assumeRoleExternalId,
+ assumeRoleSessionName,
+ assumeRoleSessionDurationSeconds,
+ credentialsProvider);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("S3BucketConfig{");
+ sb.append("bucket='").append(bucketName).append("'");
+ if (region != null) {
+ sb.append(", region='").append(region).append("'");
+ }
+ if (endpoint != null) {
+ sb.append(", endpoint='").append(endpoint).append("'");
+ }
+ if (pathStyleAccess != null) {
+ sb.append(", pathStyleAccess=").append(pathStyleAccess);
+ }
+ if (accessKey != null) {
+ sb.append(",
credentials=").append(GlobalConfiguration.HIDDEN_CONTENT);
+ }
+ if (sseType != null) {
+ sb.append(", sseType='").append(sseType).append("'");
+ }
+ if (sseKmsKeyId != null) {
+ sb.append(",
sseKmsKeyId=").append(GlobalConfiguration.HIDDEN_CONTENT);
+ }
+ if (assumeRoleArn != null) {
+ sb.append(", assumeRoleArn='").append(assumeRoleArn).append("'");
+ }
+ if (assumeRoleExternalId != null) {
+ sb.append(",
assumeRoleExternalId='").append(assumeRoleExternalId).append("'");
+ }
+ if (assumeRoleSessionName != null) {
+ sb.append(",
assumeRoleSessionName='").append(assumeRoleSessionName).append("'");
+ }
+ if (assumeRoleSessionDurationSeconds != null) {
+ sb.append(", assumeRoleSessionDurationSeconds=")
+ .append(assumeRoleSessionDurationSeconds);
+ }
+ if (credentialsProvider != null) {
+ sb.append(",
credentialsProvider='").append(credentialsProvider).append("'");
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ static Builder builder(String bucketName) {
+ if (StringUtils.isNullOrWhitespaceOnly(bucketName)) {
+ throw new IllegalArgumentException("Bucket name must not be null
or empty");
+ }
+ return new Builder(bucketName);
+ }
+
+ static final class Builder {
+ private final String bucketName;
+ @Nullable private String region;
+ @Nullable private String endpoint;
+ @Nullable private Boolean pathStyleAccess;
+ @Nullable private String accessKey;
+ @Nullable private String secretKey;
+ @Nullable private String sseType;
+ @Nullable private String sseKmsKeyId;
+ @Nullable private String assumeRoleArn;
+ @Nullable private String assumeRoleExternalId;
+ @Nullable private String assumeRoleSessionName;
+ @Nullable private Integer assumeRoleSessionDurationSeconds;
+ @Nullable private String credentialsProvider;
+
+ private Builder(String bucketName) {
+ this.bucketName = bucketName;
+ }
+
+ String getBucketName() {
+ return bucketName;
+ }
+
+ Builder region(String region) {
+ this.region = region;
+ return this;
+ }
+
+ Builder endpoint(String endpoint) {
+ this.endpoint = endpoint;
+ return this;
+ }
+
+ Builder pathStyleAccess(boolean pathStyleAccess) {
+ this.pathStyleAccess = pathStyleAccess;
+ return this;
+ }
+
+ Builder accessKey(String accessKey) {
+ this.accessKey = accessKey;
+ return this;
+ }
+
+ Builder secretKey(String secretKey) {
+ this.secretKey = secretKey;
+ return this;
+ }
+
+ Builder sseType(String sseType) {
+ this.sseType = sseType;
+ return this;
+ }
+
+ Builder sseKmsKeyId(String sseKmsKeyId) {
+ this.sseKmsKeyId = sseKmsKeyId;
+ return this;
+ }
+
+ Builder assumeRoleArn(String assumeRoleArn) {
+ this.assumeRoleArn = assumeRoleArn;
+ return this;
+ }
+
+ Builder assumeRoleExternalId(String assumeRoleExternalId) {
+ this.assumeRoleExternalId = assumeRoleExternalId;
+ return this;
+ }
+
+ Builder assumeRoleSessionName(String assumeRoleSessionName) {
+ this.assumeRoleSessionName = assumeRoleSessionName;
+ return this;
+ }
+
+ Builder assumeRoleSessionDurationSeconds(int
assumeRoleSessionDurationSeconds) {
+ this.assumeRoleSessionDurationSeconds =
assumeRoleSessionDurationSeconds;
+ return this;
+ }
+
+ Builder credentialsProvider(String credentialsProvider) {
+ this.credentialsProvider = credentialsProvider;
+ return this;
+ }
+
+ S3BucketConfig build() {
+ boolean hasAccessKey =
!StringUtils.isNullOrWhitespaceOnly(accessKey);
+ boolean hasSecretKey =
!StringUtils.isNullOrWhitespaceOnly(secretKey);
+ if (hasAccessKey != hasSecretKey) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Bucket '%s': both 's3.bucket.%s.access-key'
and "
+ + "'s3.bucket.%s.secret-key' must be
set together. "
+ + "Found only %s.",
+ bucketName,
+ bucketName,
+ bucketName,
+ hasAccessKey ? "access-key" : "secret-key"));
+ }
+ return new S3BucketConfig(this);
+ }
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
index 8f327ff54cf..cd1c918c486 100644
---
a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
+++
b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java
@@ -88,6 +88,12 @@ class S3ClientProvider implements AutoCloseableAsync {
private final boolean checksumValidation;
private final int maxConnections;
private final int maxRetries;
+ @Nullable private final String region;
+ @Nullable private final String endpoint;
+ @Nullable private final String assumeRoleArn;
+ @Nullable private final String assumeRoleExternalId;
+ @Nullable private final String assumeRoleSessionName;
+ private final int assumeRoleSessionDurationSeconds;
private final AtomicBoolean closed = new AtomicBoolean(false);
private S3ClientProvider(
@@ -104,7 +110,13 @@ class S3ClientProvider implements AutoCloseableAsync {
boolean chunkedEncoding,
boolean checksumValidation,
int maxConnections,
- int maxRetries) {
+ int maxRetries,
+ @Nullable String region,
+ @Nullable String endpoint,
+ @Nullable String assumeRoleArn,
+ @Nullable String assumeRoleExternalId,
+ @Nullable String assumeRoleSessionName,
+ int assumeRoleSessionDurationSeconds) {
this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client must
not be null");
this.transferManager =
Preconditions.checkNotNull(transferManager, "transferManager
must not be null");
@@ -129,6 +141,12 @@ class S3ClientProvider implements AutoCloseableAsync {
this.checksumValidation = checksumValidation;
this.maxConnections = maxConnections;
this.maxRetries = maxRetries;
+ this.region = region;
+ this.endpoint = endpoint;
+ this.assumeRoleArn = assumeRoleArn;
+ this.assumeRoleExternalId = assumeRoleExternalId;
+ this.assumeRoleSessionName = assumeRoleSessionName;
+ this.assumeRoleSessionDurationSeconds =
assumeRoleSessionDurationSeconds;
}
public S3Client getS3Client() {
@@ -196,6 +214,41 @@ class S3ClientProvider implements AutoCloseableAsync {
return maxRetries;
}
+ @VisibleForTesting
+ @Nullable
+ String getRegion() {
+ return region;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ String getEndpoint() {
+ return endpoint;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ String getAssumeRoleArn() {
+ return assumeRoleArn;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ String getAssumeRoleExternalId() {
+ return assumeRoleExternalId;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ String getAssumeRoleSessionName() {
+ return assumeRoleSessionName;
+ }
+
+ @VisibleForTesting
+ int getAssumeRoleSessionDurationSeconds() {
+ return assumeRoleSessionDurationSeconds;
+ }
+
@Override
public CompletableFuture<Void> closeAsync() {
if (!closed.compareAndSet(false, true)) {
@@ -462,7 +515,13 @@ class S3ClientProvider implements AutoCloseableAsync {
chunkedEncoding,
checksumValidation,
maxConnections,
- maxRetries);
+ maxRetries,
+ region,
+ endpoint,
+ assumeRoleArn,
+ assumeRoleExternalId,
+ assumeRoleSessionName,
+ assumeRoleSessionDurationSeconds);
}
private AwsCredentialsProvider buildBaseCredentialsProvider() {
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java
new file mode 100644
index 00000000000..97921ff3175
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link BucketConfigProvider}. */
+class BucketConfigProviderTest {
+
+ /** One test exercises all 11 known properties on a single bucket. */
+ @Test
+ void testParsesAllKnownPropertiesForSingleBucket() {
+ Configuration config = new Configuration();
+ config.setString("s3.bucket.my-bucket.region", "us-west-2");
+ config.setString("s3.bucket.my-bucket.endpoint",
"https://s3.us-west-2.amazonaws.com");
+ config.setString("s3.bucket.my-bucket.path-style-access", "true");
+ config.setString("s3.bucket.my-bucket.access-key",
"AKIAIOSFODNN7EXAMPLE");
+ config.setString(
+ "s3.bucket.my-bucket.secret-key",
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
+ config.setString("s3.bucket.my-bucket.sse.type", "sse-kms");
+ config.setString(
+ "s3.bucket.my-bucket.sse.kms.key-id",
+ "arn:aws:kms:us-east-1:123456789:key/12345678");
+ config.setString(
+ "s3.bucket.my-bucket.assume-role.arn",
+ "arn:aws:iam::123456789012:role/S3AccessRole");
+ config.setString("s3.bucket.my-bucket.assume-role.external-id",
"ext-id-abc");
+ config.setString("s3.bucket.my-bucket.assume-role.session-name",
"flink-job");
+ config.setString("s3.bucket.my-bucket.assume-role.session-duration",
"7200");
+ config.setString(
+ "s3.bucket.my-bucket.aws.credentials.provider",
+
"software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider");
+
+ BucketConfigProvider provider = new BucketConfigProvider(config);
+
+ assertThat(provider.size()).isEqualTo(1);
+ S3BucketConfig bucket = provider.getBucketConfig("my-bucket");
+ assertThat(bucket).isNotNull();
+ assertThat(bucket.getRegion()).isEqualTo("us-west-2");
+
assertThat(bucket.getEndpoint()).isEqualTo("https://s3.us-west-2.amazonaws.com");
+ assertThat(bucket.getPathStyleAccess()).isTrue();
+ assertThat(bucket.getAccessKey()).isEqualTo("AKIAIOSFODNN7EXAMPLE");
+
assertThat(bucket.getSecretKey()).isEqualTo("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
+ assertThat(bucket.getSseType()).isEqualTo("sse-kms");
+ assertThat(bucket.getSseKmsKeyId())
+ .isEqualTo("arn:aws:kms:us-east-1:123456789:key/12345678");
+ assertThat(bucket.getAssumeRoleArn())
+ .isEqualTo("arn:aws:iam::123456789012:role/S3AccessRole");
+ assertThat(bucket.getAssumeRoleExternalId()).isEqualTo("ext-id-abc");
+ assertThat(bucket.getAssumeRoleSessionName()).isEqualTo("flink-job");
+
assertThat(bucket.getAssumeRoleSessionDurationSeconds()).isEqualTo(7200);
+ assertThat(bucket.getCredentialsProvider())
+
.isEqualTo("software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider");
+ }
+
+ @Test
+ void testParsesMultipleBuckets() {
+ Configuration config = new Configuration();
+ config.setString(
+ "s3.bucket.checkpoint-bucket.endpoint",
"https://s3.us-east-1.amazonaws.com");
+ config.setString("s3.bucket.checkpoint-bucket.region", "us-east-1");
+ config.setString(
+ "s3.bucket.savepoint-bucket.endpoint",
"https://s3.eu-west-1.amazonaws.com");
+ config.setString("s3.bucket.savepoint-bucket.region", "eu-west-1");
+ config.setString("s3.bucket.savepoint-bucket.path-style-access",
"false");
+
+ BucketConfigProvider provider = new BucketConfigProvider(config);
+
+ assertThat(provider.size()).isEqualTo(2);
+
+ S3BucketConfig cpConfig =
provider.getBucketConfig("checkpoint-bucket");
+ assertThat(cpConfig).isNotNull();
+
assertThat(cpConfig.getEndpoint()).isEqualTo("https://s3.us-east-1.amazonaws.com");
+ assertThat(cpConfig.getRegion()).isEqualTo("us-east-1");
+ assertThat(cpConfig.getPathStyleAccess()).isNull();
+
+ S3BucketConfig spConfig = provider.getBucketConfig("savepoint-bucket");
+ assertThat(spConfig).isNotNull();
+
assertThat(spConfig.getEndpoint()).isEqualTo("https://s3.eu-west-1.amazonaws.com");
+ assertThat(spConfig.getRegion()).isEqualTo("eu-west-1");
+ assertThat(spConfig.getPathStyleAccess()).isFalse();
+ }
+
+ /** Bucket names containing dots are fully supported via longest-suffix
matching. */
+ @Test
+ void testDottedBucketName() {
+ Configuration config = new Configuration();
+ config.setString("s3.bucket.my.company.data.endpoint",
"https://s3-custom.example.com");
+ config.setString("s3.bucket.my.company.data.region", "ap-southeast-1");
+ config.setString("s3.bucket.my.company.data.sse.type", "sse-s3");
+ config.setString("s3.bucket.my.company.data.sse.kms.key-id",
"key-123");
+
+ BucketConfigProvider provider = new BucketConfigProvider(config);
+
+ assertThat(provider.hasBucketConfig("my.company.data")).isTrue();
+ S3BucketConfig bucket = provider.getBucketConfig("my.company.data");
+
assertThat(bucket.getEndpoint()).isEqualTo("https://s3-custom.example.com");
+ assertThat(bucket.getRegion()).isEqualTo("ap-southeast-1");
+ assertThat(bucket.getSseType()).isEqualTo("sse-s3");
+ assertThat(bucket.getSseKmsKeyId()).isEqualTo("key-123");
+ }
+
+ @Test
+ void testNonBucketConfigKeysIgnored() {
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "GLOBAL_KEY");
+ config.setString("s3.secret-key", "GLOBAL_SECRET");
+ config.setString("s3.region", "us-east-1");
+ config.setString("s3.bucket.my-bucket.region", "eu-west-1");
+
+ BucketConfigProvider provider = new BucketConfigProvider(config);
+
+ assertThat(provider.size()).isEqualTo(1);
+ assertThat(provider.hasBucketConfig("my-bucket")).isTrue();
+ }
+
+ /** A key whose bucket segment is empty (e.g. {@code s3.bucket..region})
must be ignored. */
+ @Test
+ void testEmptyBucketSegmentInKeyIsIgnored() {
+ Configuration config = new Configuration();
+ config.setString("s3.bucket..region", "us-east-1");
+
+ BucketConfigProvider provider = new BucketConfigProvider(config);
+
+ assertThat(provider.size()).isEqualTo(0);
+ }
+
+ /** A bucket whose keys only match unknown properties produces no
registered bucket config. */
+ @Test
+ void testBucketWithOnlyUnknownPropertiesProducesNoConfig() {
+ Configuration config = new Configuration();
+ config.setString("s3.bucket.my-bucket.unknown-property", "some-value");
+ config.setString("s3.bucket.my-bucket.another-unknown", "other-value");
+
+ BucketConfigProvider provider = new BucketConfigProvider(config);
+
+ assertThat(provider.size()).isEqualTo(0);
+ assertThat(provider.getBucketConfig("my-bucket")).isNull();
+ }
+
+ @Test
+ void testUnknownPropertyMixedWithKnownIsIgnored() {
+ Configuration config = new Configuration();
+ config.setString("s3.bucket.my-bucket.unknown-property", "some-value");
+ config.setString("s3.bucket.my-bucket.region", "us-east-1");
+
+ BucketConfigProvider provider = new BucketConfigProvider(config);
+
+ S3BucketConfig bucket = provider.getBucketConfig("my-bucket");
+ assertThat(bucket).isNotNull();
+ assertThat(bucket.getRegion()).isEqualTo("us-east-1");
+ }
+
+ @Test
+ void testNoBucketConfigReturnsNull() {
+ BucketConfigProvider provider = new BucketConfigProvider(new
Configuration());
+
+ assertThat(provider.getBucketConfig("non-existent-bucket")).isNull();
+ assertThat(provider.size()).isEqualTo(0);
+ }
+
+ @Test
+ void testEmptyConfigurationProducesNoEntries() {
+ assertThat(new BucketConfigProvider(new
Configuration()).size()).isEqualTo(0);
+ }
+
+ @Test
+ void testPartialCredentialsRejected() {
+ Configuration config = new Configuration();
+ config.setString("s3.bucket.bad-bucket.access-key",
"AKIAIOSFODNN7EXAMPLE");
+
+ assertThatThrownBy(() -> new BucketConfigProvider(config))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining("must be set together");
+ }
+
+ @Test
+ void testInvalidSessionDurationThrowsException() {
+ Configuration config = new Configuration();
+ config.setString("s3.bucket.my-bucket.assume-role.session-duration",
"not-a-number");
+ config.setString("s3.bucket.my-bucket.region", "us-east-1");
+
+ assertThatThrownBy(() -> new BucketConfigProvider(config))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining("Invalid assume-role.session-duration");
+ }
+
+ @Test
+ void testInvalidPathStyleAccessThrowsException() {
+ Configuration config = new Configuration();
+ config.setString("s3.bucket.my-bucket.path-style-access", "treu");
+ config.setString("s3.bucket.my-bucket.region", "us-east-1");
+
+ assertThatThrownBy(() -> new BucketConfigProvider(config))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining("Invalid path-style-access");
+ }
+
+ @Test
+ void testUnrecognizedBucketPropertyIsIgnoredWithoutThrow() {
+ Configuration config = new Configuration();
+ config.setString("s3.bucket.my-bucket.typo-region", "us-east-1");
+ config.setString("s3.bucket.my-bucket.region", "eu-west-1");
+
+ BucketConfigProvider provider = new BucketConfigProvider(config);
+
+ S3BucketConfig bucket = provider.getBucketConfig("my-bucket");
+ assertThat(bucket).isNotNull();
+ assertThat(bucket.getRegion()).isEqualTo("eu-west-1");
+ }
+
+ @Test
+ void testPropertyApplicatorsCoverAllKnownProperties() {
+ assertThat(BucketConfigProvider.PROPERTY_APPLICATORS.size())
+ .as("PROPERTY_APPLICATORS must have an entry for every known
property")
+
.isEqualTo(BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.size());
+
+ assertThat(BucketConfigProvider.PROPERTY_APPLICATORS.keySet())
+ .containsExactlyInAnyOrderElementsOf(
+ BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH);
+ }
+
+ @Test
+ void testKnownPropertiesSortedByDescendingLength() {
+ for (int i = 1; i <
BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.size(); i++) {
+
assertThat(BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.get(i).length())
+ .as(
+ "Property at index %d ('%s') should not be longer
than property at index %d ('%s')",
+ i,
+
BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.get(i),
+ i - 1,
+
BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.get(i - 1))
+ .isLessThanOrEqualTo(
+
BucketConfigProvider.KNOWN_PROPERTIES_BY_LENGTH.get(i - 1).length());
+ }
+ }
+}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
index 82a35e3d3c4..e673af3a55e 100644
---
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
+import java.io.IOException;
import java.net.URI;
import java.time.Duration;
@@ -31,7 +33,6 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link NativeS3FileSystemFactory}. */
class NativeS3FileSystemFactoryTest {
-
private static Configuration baseConfig() {
Configuration config = new Configuration();
config.setString("s3.access-key", "test-access-key");
@@ -71,9 +72,12 @@ class NativeS3FileSystemFactoryTest {
@Test
void testCreateFileSystemWithCustomEndpoint() throws Exception {
+ // Global: endpoint A; bucket: endpoint B → bucket endpoint is used
Configuration config = baseConfig();
- config.setString("s3.endpoint", "http://localhost:9000");
- assertThat(createFs(config)).isNotNull();
+ config.setString("s3.endpoint", "http://global.s3:9000");
+ config.setString("s3.bucket.test-bucket.endpoint",
"http://bucket.s3:9000");
+ assertThat(createFs(config).getClientProvider().getEndpoint())
+ .isEqualTo("http://bucket.s3:9000");
}
@Test
@@ -84,9 +88,12 @@ class NativeS3FileSystemFactoryTest {
@Test
void testS3ACreateFileSystemWithCustomEndpoint() throws Exception {
+ // Global: endpoint A; bucket: endpoint B → bucket endpoint is used on
s3a scheme
Configuration config = baseConfig();
- config.setString("s3.endpoint", "http://localhost:9000");
- assertThat(createS3aFs(config)).isNotNull();
+ config.setString("s3.endpoint", "http://global.s3:9000");
+ config.setString("s3.bucket.test-bucket.endpoint",
"http://bucket.s3:9000");
+ assertThat(createS3aFs(config).getClientProvider().getEndpoint())
+ .isEqualTo("http://bucket.s3:9000");
}
// --- Path-style access ---
@@ -97,9 +104,11 @@ class NativeS3FileSystemFactoryTest {
}
@Test
- void testPathStyleAccessExplicitlyEnabled() throws Exception {
+ void testPathStyleAccessBucketOverridesGlobal() throws Exception {
+ // Global: false; bucket: true → bucket wins
Configuration config = baseConfig();
- config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, true);
+ config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, false);
+ config.setString("s3.bucket.test-bucket.path-style-access", "true");
assertThat(createFs(config).getClientProvider().isPathStyleAccess()).isTrue();
}
@@ -313,10 +322,12 @@ class NativeS3FileSystemFactoryTest {
// --- Region ---
@Test
- void testExplicitRegionConfiguration() throws Exception {
+ void testRegionBucketOverridesGlobal() throws Exception {
+ // Global: us-east-1; bucket: eu-west-1 → bucket wins
Configuration config = baseConfig();
- config.setString("s3.region", "eu-west-1");
- assertThat(createFs(config)).isNotNull();
+ config.setString("s3.region", "us-east-1");
+ config.setString("s3.bucket.test-bucket.region", "eu-west-1");
+
assertThat(createFs(config).getClientProvider().getRegion()).isEqualTo("eu-west-1");
}
@Test
@@ -360,10 +371,17 @@ class NativeS3FileSystemFactoryTest {
// --- s3a scheme ---
@Test
- void testS3AWithSSEConfiguration() throws Exception {
+ void testS3AWithSSEBucketOverridesGlobal() throws Exception {
+ // Global: none; bucket: sse-s3 → bucket wins on the s3a scheme
Configuration config = baseConfig();
- config.setString("s3.sse.type", "sse-s3");
- assertThat(createS3aFs(config)).isNotNull();
+ config.setString("s3.sse.type", "none");
+ config.setString("s3.bucket.test-bucket.sse.type", "sse-s3");
+ assertThat(
+ createS3aFs(config)
+ .getClientProvider()
+ .getEncryptionConfig()
+ .getEncryptionType())
+ .isEqualTo(S3EncryptionConfig.EncryptionType.SSE_S3);
}
@Test
@@ -378,4 +396,148 @@ class NativeS3FileSystemFactoryTest {
assertThat(fs.getClientProvider().isChunkedEncoding()).isFalse();
assertThat(fs.getClientProvider().isChecksumValidation()).isFalse();
}
+
+ // ---- Bucket-level configuration tests ----
+
+ /**
+ * Validates that misconfigured per-bucket credentials surface as a
configuration error at
+ * {@code configure()} time, not as an opaque AWS SDK error at first
request. Override
+ * resolution itself (which wins between bucket and global) is
exhaustively covered by {@code
+ * BucketConfigProviderTest}; this test guards the factory-layer behaviour
that is unique to it:
+ * throwing on partial bucket credentials.
+ */
+ @Test
+ void testBucketSpecificPartialCredentialsThrows() {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ Configuration config = new Configuration();
+ config.setString("s3.access-key", "global-access-key");
+ config.setString("s3.secret-key", "global-secret-key");
+ config.setString("s3.region", "us-east-1");
+ config.setString("s3.bucket.bad-bucket.access-key", "only-access-key");
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+ assertThatThrownBy(() -> factory.configure(config))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining("must be set together");
+ }
+
+ @Test
+ void testBucketOverrideWinsForConnectionAndEncryptionFields() throws
Exception {
+ Configuration config = new Configuration();
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+ // Global
+ config.setString("s3.access-key", "global-access");
+ config.setString("s3.secret-key", "global-secret");
+ config.setString("s3.region", "us-east-1");
+ config.setString("s3.endpoint", "http://global.s3:9000");
+ config.set(NativeS3FileSystemFactory.PATH_STYLE_ACCESS, false);
+ config.setString("s3.sse.type", "sse-s3");
+ config.setString("s3.sse.kms.key-id", "global-kms-key");
+ // Bucket
+ config.setString("s3.bucket.test-bucket.access-key", "bucket-access");
+ config.setString("s3.bucket.test-bucket.secret-key", "bucket-secret");
+ config.setString("s3.bucket.test-bucket.region", "eu-west-1");
+ config.setString("s3.bucket.test-bucket.endpoint",
"http://bucket.s3:9000");
+ config.setString("s3.bucket.test-bucket.path-style-access", "true");
+ config.setString("s3.bucket.test-bucket.sse.type", "sse-kms");
+ config.setString("s3.bucket.test-bucket.sse.kms.key-id",
"bucket-kms-key");
+
+ S3ClientProvider provider = createFs(config).getClientProvider();
+
+ // All bucket values win over global
+ assertThat(provider.getRegion()).isEqualTo("eu-west-1");
+ assertThat(provider.getEndpoint()).isEqualTo("http://bucket.s3:9000");
+ assertThat(provider.isPathStyleAccess()).isTrue();
+
assertThat(provider.getCredentialsProvider().resolveCredentials().accessKeyId())
+ .isEqualTo("bucket-access");
+
assertThat(provider.getCredentialsProvider().resolveCredentials().secretAccessKey())
+ .isEqualTo("bucket-secret");
+ assertThat(provider.getEncryptionConfig().getEncryptionType())
+ .isEqualTo(S3EncryptionConfig.EncryptionType.SSE_KMS);
+
assertThat(provider.getEncryptionConfig().getKmsKeyId()).isEqualTo("bucket-kms-key");
+ }
+
+ @Test
+ void testBucketOverrideWinsForAssumeRoleFields() throws Exception {
+ Configuration config = new Configuration();
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+ config.setString("s3.access-key", "global-access");
+ config.setString("s3.secret-key", "global-secret");
+ config.setString("s3.region", "us-east-1");
+ // Global assume-role
+ config.setString("s3.assume-role.arn",
"arn:aws:iam::111111111111:role/GlobalRole");
+ config.setString("s3.assume-role.external-id", "global-ext-id");
+ config.setString("s3.assume-role.session-name", "global-session");
+
config.set(NativeS3FileSystemFactory.ASSUME_ROLE_SESSION_DURATION_SECONDS, 900);
+ // Bucket assume-role overrides
+ config.setString(
+ "s3.bucket.test-bucket.assume-role.arn",
+ "arn:aws:iam::222222222222:role/BucketRole");
+ config.setString("s3.bucket.test-bucket.assume-role.external-id",
"bucket-ext-id");
+ config.setString("s3.bucket.test-bucket.assume-role.session-name",
"bucket-session");
+ config.setString("s3.bucket.test-bucket.assume-role.session-duration",
"1800");
+
+ S3ClientProvider provider = createFs(config).getClientProvider();
+ assertThat(provider.getAssumeRoleArn())
+ .isEqualTo("arn:aws:iam::222222222222:role/BucketRole");
+
assertThat(provider.getAssumeRoleExternalId()).isEqualTo("bucket-ext-id");
+
assertThat(provider.getAssumeRoleSessionName()).isEqualTo("bucket-session");
+
assertThat(provider.getAssumeRoleSessionDurationSeconds()).isEqualTo(1800);
+ }
+
+ @Test
+ void testBucketOverrideWinsForCredentialsProvider() throws Exception {
+ Configuration config = new Configuration();
+ config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+ config.setString("s3.region", "us-east-1");
+ // Global: static credentials
+ config.setString("s3.access-key", "global-access");
+ config.setString("s3.secret-key", "global-secret");
+ // Bucket: AnonymousCredentialsProvider
+ config.setString(
+ "s3.bucket.test-bucket.aws.credentials.provider",
+
"software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider");
+ AwsCredentials creds =
+
createFs(config).getClientProvider().getCredentialsProvider().resolveCredentials();
+
+ // Bucket anonymous provider wins; global static key "global-access"
must not be used
+ assertThat(creds.accessKeyId()).isNotEqualTo("global-access");
+ }
+
+ @Test
+ void testBucketOverrideIgnoredForDifferentBucket() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.region", "us-east-1");
+ config.setString("s3.endpoint", "http://global.s3:9000");
+ config.setString("s3.bucket.other-bucket.region", "ap-south-1");
+ config.setString("s3.bucket.other-bucket.endpoint",
"http://other.s3:9000");
+
+ // createFs uses URI s3://test-bucket/ — "other-bucket" overrides must
NOT apply
+ NativeS3FileSystem fs = createFs(config);
+ assertThat(fs.getClientProvider().getRegion()).isEqualTo("us-east-1");
+
assertThat(fs.getClientProvider().getEndpoint()).isEqualTo("http://global.s3:9000");
+ }
+
+ @Test
+ void testPartialBucketOverrideFallsBackToGlobal() throws Exception {
+ Configuration config = baseConfig();
+ config.setString("s3.region", "us-east-1");
+ config.setString("s3.endpoint", "http://global.s3:9000");
+ // Override only region for the bucket — endpoint falls back to global
+ config.setString("s3.bucket.test-bucket.region", "eu-central-1");
+
+ NativeS3FileSystem fs = createFs(config);
+
assertThat(fs.getClientProvider().getRegion()).isEqualTo("eu-central-1");
+
assertThat(fs.getClientProvider().getEndpoint()).isEqualTo("http://global.s3:9000");
+ }
+
+ @Test
+ void testMissingBucketNameInUriThrowsIOException() {
+ NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+ factory.configure(baseConfig());
+
+ assertThatThrownBy(() ->
factory.create(URI.create("s3:///path/to/file")))
+ .isInstanceOf(IOException.class)
+ .hasMessageContaining("bucket name");
+ }
}
diff --git
a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BucketConfigTest.java
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BucketConfigTest.java
new file mode 100644
index 00000000000..00543c0c9c8
--- /dev/null
+++
b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/S3BucketConfigTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.fs.s3native;
+
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link S3BucketConfig}. */
+class S3BucketConfigTest {
+
+ @Test
+ void testBuilderWithAllFields() {
+ S3BucketConfig config =
+ S3BucketConfig.builder("my-bucket")
+ .region("us-west-2")
+ .endpoint("https://custom.s3.endpoint")
+ .pathStyleAccess(true)
+ .accessKey("AKIAIOSFODNN7EXAMPLE")
+ .secretKey("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+ .sseType("sse-kms")
+ .sseKmsKeyId("arn:aws:kms:us-east-1:123:key/abc")
+ .assumeRoleArn("arn:aws:iam::123:role/S3Role")
+ .assumeRoleExternalId("ext-id-123")
+ .assumeRoleSessionName("my-session")
+ .assumeRoleSessionDurationSeconds(7200)
+ .credentialsProvider("AnonymousCredentialsProvider")
+ .build();
+
+ assertThat(config.getBucketName()).isEqualTo("my-bucket");
+ assertThat(config.getRegion()).isEqualTo("us-west-2");
+
assertThat(config.getEndpoint()).isEqualTo("https://custom.s3.endpoint");
+ assertThat(config.getPathStyleAccess()).isTrue();
+ assertThat(config.getAccessKey()).isEqualTo("AKIAIOSFODNN7EXAMPLE");
+
assertThat(config.getSecretKey()).isEqualTo("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY");
+ assertThat(config.getSseType()).isEqualTo("sse-kms");
+
assertThat(config.getSseKmsKeyId()).isEqualTo("arn:aws:kms:us-east-1:123:key/abc");
+
assertThat(config.getAssumeRoleArn()).isEqualTo("arn:aws:iam::123:role/S3Role");
+ assertThat(config.getAssumeRoleExternalId()).isEqualTo("ext-id-123");
+ assertThat(config.getAssumeRoleSessionName()).isEqualTo("my-session");
+
assertThat(config.getAssumeRoleSessionDurationSeconds()).isEqualTo(7200);
+
assertThat(config.getCredentialsProvider()).isEqualTo("AnonymousCredentialsProvider");
+ assertThat(config.hasAnyOverride()).isTrue();
+ }
+
+ @Test
+ void testNoOverridesHasAnyOverrideFalse() {
+
assertThat(S3BucketConfig.builder("empty-bucket").build().hasAnyOverride()).isFalse();
+ }
+
+ /** Each field, when set alone, must trigger {@code hasAnyOverride()}. */
+ static Stream<S3BucketConfig> singleFieldConfigs() {
+ return Stream.of(
+ S3BucketConfig.builder("b").region("us-east-1").build(),
+
S3BucketConfig.builder("b").endpoint("http://localhost:9000").build(),
+ S3BucketConfig.builder("b").pathStyleAccess(true).build(),
+
S3BucketConfig.builder("b").accessKey("KEY").secretKey("SECRET").build(),
+ S3BucketConfig.builder("b").sseType("sse-s3").build(),
+ S3BucketConfig.builder("b").sseKmsKeyId("key-id").build(),
+
S3BucketConfig.builder("b").assumeRoleArn("arn:aws:iam::1:role/R").build(),
+
S3BucketConfig.builder("b").assumeRoleExternalId("ext-id").build(),
+
S3BucketConfig.builder("b").assumeRoleSessionName("session").build(),
+
S3BucketConfig.builder("b").assumeRoleSessionDurationSeconds(900).build(),
+ S3BucketConfig.builder("b")
+ .credentialsProvider("AnonymousCredentialsProvider")
+ .build());
+ }
+
+ @ParameterizedTest
+ @MethodSource("singleFieldConfigs")
+ void testEachFieldAloneTriggersHasAnyOverride(S3BucketConfig config) {
+ assertThat(config.hasAnyOverride())
+ .as("hasAnyOverride() must be true when any single field is
set")
+ .isTrue();
+ }
+
+ @Test
+ void testPartialCredentialsAccessKeyOnlyRejected() {
+ assertThatThrownBy(
+ () ->
S3BucketConfig.builder("b").accessKey("AKIAIOSFODNN7EXAMPLE").build())
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining("access-key")
+ .hasMessageContaining("secret-key")
+ .hasMessageContaining("must be set together");
+ }
+
+ @Test
+ void testPartialCredentialsSecretKeyOnlyRejected() {
+ assertThatThrownBy(
+ () ->
+ S3BucketConfig.builder("b")
+
.secretKey("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+ .build())
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining("access-key")
+ .hasMessageContaining("secret-key")
+ .hasMessageContaining("must be set together");
+ }
+
+ @Test
+ void testToStringRedactsCredentials() {
+ S3BucketConfig config =
+ S3BucketConfig.builder("secure-bucket")
+ .accessKey("AKIAIOSFODNN7EXAMPLE")
+ .secretKey("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+ .region("us-east-1")
+ .build();
+
+ String str = config.toString();
+ assertThat(str).contains("credentials=" +
GlobalConfiguration.HIDDEN_CONTENT);
+ assertThat(str).doesNotContain("AKIAIOSFODNN7EXAMPLE");
+ assertThat(str).doesNotContain("wJalrXUtnFEMI");
+ }
+
+ @Test
+ void testToStringRedactsKmsKeyIdAndIncludesAllFields() {
+ S3BucketConfig config =
+ S3BucketConfig.builder("my-bucket")
+ .region("us-west-2")
+ .endpoint("https://s3.example.com")
+ .pathStyleAccess(true)
+ .accessKey("AKIAIOSFODNN7EXAMPLE")
+ .secretKey("wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY")
+ .sseType("sse-kms")
+ .sseKmsKeyId("arn:aws:kms:us-east-1:123:key/abc")
+ .assumeRoleArn("arn:aws:iam::123:role/R")
+ .assumeRoleExternalId("ext-id")
+ .assumeRoleSessionName("my-session")
+ .assumeRoleSessionDurationSeconds(3600)
+ .credentialsProvider("AnonymousCredentialsProvider")
+ .build();
+
+ String str = config.toString();
+ assertThat(str).contains("region='us-west-2'");
+ assertThat(str).contains("endpoint='https://s3.example.com'");
+ assertThat(str).contains("pathStyleAccess=true");
+ assertThat(str).contains("sseType='sse-kms'");
+ assertThat(str).contains("sseKmsKeyId=" +
GlobalConfiguration.HIDDEN_CONTENT);
+ assertThat(str).doesNotContain("arn:aws:kms:us-east-1:123:key/abc");
+ assertThat(str).contains("assumeRoleArn='arn:aws:iam::123:role/R'");
+ assertThat(str).contains("assumeRoleExternalId='ext-id'");
+ assertThat(str).contains("assumeRoleSessionName='my-session'");
+ assertThat(str).contains("assumeRoleSessionDurationSeconds=3600");
+
assertThat(str).contains("credentialsProvider='AnonymousCredentialsProvider'");
+ }
+}