This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 961be46be93 [FLINK-39431][s3] Add .value suffix key aliases in 
S3FileSystemFactory to support Flink v2 standard YAML
961be46be93 is described below

commit 961be46be932ff0c0ef13f7c174c7f26f5616b54
Author: Gabor Somogyi <[email protected]>
AuthorDate: Mon Apr 13 19:49:16 2026 +0200

    [FLINK-39431][s3] Add .value suffix key aliases in S3FileSystemFactory to 
support Flink v2 standard YAML
---
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     | 11 +++++-
 .../flink/fs/s3hadoop/HadoopS3FileSystemTest.java  | 45 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index cab089361ea..37f984f630a 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -43,7 +43,16 @@ public class S3FileSystemFactory extends 
AbstractS3FileSystemFactory {
     private static final String[][] MIRRORED_CONFIG_KEYS = {
         {"fs.s3a.access-key", "fs.s3a.access.key"},
         {"fs.s3a.secret-key", "fs.s3a.secret.key"},
-        {"fs.s3a.path-style-access", "fs.s3a.path.style.access"}
+        {"fs.s3a.path-style-access", "fs.s3a.path.style.access"},
+        // Flink v2 uses standard YAML which cannot represent a key that is 
both a scalar value and
+        // a parent of other keys (e.g. fs.s3a.endpoint and 
fs.s3a.endpoint.region conflict).
+        // These aliases allow users to set the scalar key via a ".value" 
suffix to avoid the
+        // collision, e.g. fs.s3a.endpoint.value maps to fs.s3a.endpoint in 
Hadoop config.
+        {"fs.s3a.endpoint.value", "fs.s3a.endpoint"},
+        {"fs.s3a.assumed.role.sts.endpoint.value", 
"fs.s3a.assumed.role.sts.endpoint"},
+        {"fs.s3a.fast.upload.value", "fs.s3a.fast.upload"},
+        {"fs.s3a.multipart.purge.value", "fs.s3a.multipart.purge"},
+        {"fs.s3a.s3guard.ddb.table.value", "fs.s3a.s3guard.ddb.table"},
     };
 
     public S3FileSystemFactory() {
diff --git 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
index af82892cb1c..8114cd4ee84 100644
--- 
a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
+++ 
b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java
@@ -80,6 +80,51 @@ class HadoopS3FileSystemTest {
         checkHadoopAccessKeys(conf, "clé d'accès", "clef secrète");
     }
 
+    /**
+     * Test that keys with the ".value" suffix are mapped to their Hadoop 
equivalents, allowing
+     * users to avoid YAML key prefix collisions in Flink v2's standard 
config.yaml format.
+     *
+     * <p>For example, "fs.s3a.endpoint" and "fs.s3a.endpoint.region" cannot 
coexist in standard
+     * YAML. Users can instead set "fs.s3a.endpoint.value" which is remapped 
to "fs.s3a.endpoint".
+     */
+    @Test
+    void testValueSuffixMappingForYamlCollisionAvoidance() {
+        Configuration conf = new Configuration();
+        conf.setString("fs.s3a.endpoint.value", 
"https://s3.eu-west-1.amazonaws.com";);
+        conf.setString("fs.s3a.endpoint.region", "eu-west-1");
+        conf.setString(
+                "fs.s3a.assumed.role.sts.endpoint.value", 
"https://sts.eu-west-1.amazonaws.com";);
+        conf.setString("fs.s3a.assumed.role.sts.endpoint.region", "eu-west-1");
+        conf.setString("fs.s3a.fast.upload.value", "true");
+        conf.setString("fs.s3a.fast.upload.buffer", "disk");
+        conf.setString("fs.s3a.multipart.purge.value", "true");
+        conf.setString("fs.s3a.multipart.purge.age", "86400");
+        conf.setString("fs.s3a.s3guard.ddb.table.value", "my-table");
+        conf.setString("fs.s3a.s3guard.ddb.table.capacity.read", "10");
+
+        HadoopConfigLoader configLoader = 
S3FileSystemFactory.createHadoopConfigLoader();
+        configLoader.setFlinkConfig(conf);
+
+        org.apache.hadoop.conf.Configuration hadoopConf = 
configLoader.getOrLoadHadoopConfig();
+
+        // ".value" keys must be remapped to their Hadoop equivalents
+        assertThat(hadoopConf.get("fs.s3a.endpoint", null))
+                .isEqualTo("https://s3.eu-west-1.amazonaws.com";);
+        assertThat(hadoopConf.get("fs.s3a.assumed.role.sts.endpoint", null))
+                .isEqualTo("https://sts.eu-west-1.amazonaws.com";);
+        assertThat(hadoopConf.get("fs.s3a.fast.upload", 
null)).isEqualTo("true");
+        assertThat(hadoopConf.get("fs.s3a.multipart.purge", 
null)).isEqualTo("true");
+        assertThat(hadoopConf.get("fs.s3a.s3guard.ddb.table", 
null)).isEqualTo("my-table");
+
+        // sibling keys must pass through unchanged
+        assertThat(hadoopConf.get("fs.s3a.endpoint.region", 
null)).isEqualTo("eu-west-1");
+        assertThat(hadoopConf.get("fs.s3a.assumed.role.sts.endpoint.region", 
null))
+                .isEqualTo("eu-west-1");
+        assertThat(hadoopConf.get("fs.s3a.fast.upload.buffer", 
null)).isEqualTo("disk");
+        assertThat(hadoopConf.get("fs.s3a.multipart.purge.age", 
null)).isEqualTo("86400");
+        assertThat(hadoopConf.get("fs.s3a.s3guard.ddb.table.capacity.read", 
null)).isEqualTo("10");
+    }
+
     private static void checkHadoopAccessKeys(
             Configuration flinkConf, String accessKey, String secretKey) {
         HadoopConfigLoader configLoader = 
S3FileSystemFactory.createHadoopConfigLoader();

Reply via email to