This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 961be46be93 [FLINK-39431][s3] Add .value suffix key aliases in
S3FileSystemFactory to support Flink v2 standard YAML
961be46be93 is described below
commit 961be46be932ff0c0ef13f7c174c7f26f5616b54
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon Apr 13 19:49:16 2026 +0200
[FLINK-39431][s3] Add .value suffix key aliases in S3FileSystemFactory to
support Flink v2 standard YAML
---
.../flink/fs/s3hadoop/S3FileSystemFactory.java | 11 +++++-
.../flink/fs/s3hadoop/HadoopS3FileSystemTest.java | 45 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index cab089361ea..37f984f630a 100644
---
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -43,7 +43,16 @@ public class S3FileSystemFactory extends
AbstractS3FileSystemFactory {
private static final String[][] MIRRORED_CONFIG_KEYS = {
{"fs.s3a.access-key", "fs.s3a.access.key"},
{"fs.s3a.secret-key", "fs.s3a.secret.key"},
- {"fs.s3a.path-style-access", "fs.s3a.path.style.access"}
+ {"fs.s3a.path-style-access", "fs.s3a.path.style.access"},
+ // Flink v2 uses standard YAML which cannot represent a key that is
both a scalar value and
+ // a parent of other keys (e.g. fs.s3a.endpoint and
fs.s3a.endpoint.region conflict).
+ // These aliases allow users to set the scalar key via a ".value"
suffix to avoid the
+ // collision, e.g. fs.s3a.endpoint.value maps to fs.s3a.endpoint in
Hadoop config.
+ {"fs.s3a.endpoint.value", "fs.s3a.endpoint"},
+ {"fs.s3a.assumed.role.sts.endpoint.value",
"fs.s3a.assumed.role.sts.endpoint"},
+ {"fs.s3a.fast.upload.value", "fs.s3a.fast.upload"},
+ {"fs.s3a.multipart.purge.value", "fs.s3a.multipart.purge"},
+ {"fs.s3a.s3guard.ddb.table.value", "fs.s3a.s3guard.ddb.table"},
};
public S3FileSystemFactory() {
diff --git
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
index af82892cb1c..8114cd4ee84 100644
---
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
+++
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -80,6 +80,51 @@ class HadoopS3FileSystemTest {
checkHadoopAccessKeys(conf, "clé d'accès", "clef secrète");
}
+ /**
+ * Test that keys with the ".value" suffix are mapped to their Hadoop
equivalents, allowing
+ * users to avoid YAML key prefix collisions in Flink v2's standard
config.yaml format.
+ *
+ * <p>For example, "fs.s3a.endpoint" and "fs.s3a.endpoint.region" cannot
coexist in standard
+ * YAML. Users can instead set "fs.s3a.endpoint.value" which is remapped
to "fs.s3a.endpoint".
+ */
+ @Test
+ void testValueSuffixMappingForYamlCollisionAvoidance() {
+ Configuration conf = new Configuration();
+ conf.setString("fs.s3a.endpoint.value",
"https://s3.eu-west-1.amazonaws.com");
+ conf.setString("fs.s3a.endpoint.region", "eu-west-1");
+ conf.setString(
+ "fs.s3a.assumed.role.sts.endpoint.value",
"https://sts.eu-west-1.amazonaws.com");
+ conf.setString("fs.s3a.assumed.role.sts.endpoint.region", "eu-west-1");
+ conf.setString("fs.s3a.fast.upload.value", "true");
+ conf.setString("fs.s3a.fast.upload.buffer", "disk");
+ conf.setString("fs.s3a.multipart.purge.value", "true");
+ conf.setString("fs.s3a.multipart.purge.age", "86400");
+ conf.setString("fs.s3a.s3guard.ddb.table.value", "my-table");
+ conf.setString("fs.s3a.s3guard.ddb.table.capacity.read", "10");
+
+ HadoopConfigLoader configLoader =
S3FileSystemFactory.createHadoopConfigLoader();
+ configLoader.setFlinkConfig(conf);
+
+ org.apache.hadoop.conf.Configuration hadoopConf =
configLoader.getOrLoadHadoopConfig();
+
+ // ".value" keys must be remapped to their Hadoop equivalents
+ assertThat(hadoopConf.get("fs.s3a.endpoint", null))
+ .isEqualTo("https://s3.eu-west-1.amazonaws.com");
+ assertThat(hadoopConf.get("fs.s3a.assumed.role.sts.endpoint", null))
+ .isEqualTo("https://sts.eu-west-1.amazonaws.com");
+ assertThat(hadoopConf.get("fs.s3a.fast.upload",
null)).isEqualTo("true");
+ assertThat(hadoopConf.get("fs.s3a.multipart.purge",
null)).isEqualTo("true");
+ assertThat(hadoopConf.get("fs.s3a.s3guard.ddb.table",
null)).isEqualTo("my-table");
+
+ // sibling keys must pass through unchanged
+ assertThat(hadoopConf.get("fs.s3a.endpoint.region",
null)).isEqualTo("eu-west-1");
+ assertThat(hadoopConf.get("fs.s3a.assumed.role.sts.endpoint.region",
null))
+ .isEqualTo("eu-west-1");
+ assertThat(hadoopConf.get("fs.s3a.fast.upload.buffer",
null)).isEqualTo("disk");
+ assertThat(hadoopConf.get("fs.s3a.multipart.purge.age",
null)).isEqualTo("86400");
+ assertThat(hadoopConf.get("fs.s3a.s3guard.ddb.table.capacity.read",
null)).isEqualTo("10");
+ }
+
private static void checkHadoopAccessKeys(
Configuration flinkConf, String accessKey, String secretKey) {
HadoopConfigLoader configLoader =
S3FileSystemFactory.createHadoopConfigLoader();