gaborgsomogyi commented on code in PR #27187:
URL: https://github.com/apache/flink/pull/27187#discussion_r2876855459


##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -372,14 +382,129 @@ public S3ClientProvider build() {
         }
 
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
-            // The token provider returns null if credentials are not 
available,
-            // allowing the chain to proceed to the next provider (fallback).
+            if (credentialsProviderClasses != null
+                    && !credentialsProviderClasses.trim().isEmpty()) {
+                return 
buildCustomCredentialsProvider(credentialsProviderClasses);
+            }
+            // Default chain: delegation tokens -> static credentials (if 
configured)
+            // -> DefaultCredentialsProvider.
             return AwsCredentialsProviderChain.builder()
                     .credentialsProviders(
                             new DynamicTemporaryAWSCredentialsProvider(), 
buildFallbackProvider())
                     .build();
         }
 
+        private AwsCredentialsProvider buildCustomCredentialsProvider(String 
classNames) {
+            String[] names = classNames.split(",");
+            List<AwsCredentialsProvider> providers = new ArrayList<>();
+            for (String name : names) {
+                String trimmed = name.trim();
+                if (trimmed.isEmpty()) {
+                    continue;
+                }
+                providers.add(instantiateCredentialsProvider(trimmed));
+            }
+            if (providers.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "fs.s3.aws.credentials.provider is set but contains no 
valid provider class names");
+            }
+            LOG.info(
+                    "Using custom credentials provider chain with {} 
provider(s): {}",
+                    providers.size(),
+                    classNames);
+            if (providers.size() == 1) {
+                return providers.get(0);
+            }
+            return 
AwsCredentialsProviderChain.builder().credentialsProviders(providers).build();
+        }
+
+        /**
+         * Instantiates an {@link AwsCredentialsProvider} from a class name.
+         *
+         * <p>Supports:
+         *
+         * <ul>
+         *   <li>Fully-qualified AWS SDK v2 class names (e.g. {@code
+         *       
software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider})
+         *   <li>Simple class names resolved from the {@code
+         *       software.amazon.awssdk.auth.credentials} package (e.g. {@code
+         *       AnonymousCredentialsProvider})
+         *   <li>Well-known Hadoop / AWS SDK v1 names mapped to their SDK v2 
equivalents
+         * </ul>
+         */
+        private AwsCredentialsProvider instantiateCredentialsProvider(String 
className) {
+            String resolvedClassName = resolveProviderClassName(className);
+            try {
+                Class<?> clazz = Class.forName(resolvedClassName);
+                if (!AwsCredentialsProvider.class.isAssignableFrom(clazz)) {
+                    throw new IllegalArgumentException(
+                            "Class "
+                                    + resolvedClassName
+                                    + " does not implement 
AwsCredentialsProvider");
+                }
+
+                // SDK v2 providers use a static create() factory method by 
convention
+                try {
+                    java.lang.reflect.Method createMethod = 
clazz.getMethod("create");
+                    if 
(java.lang.reflect.Modifier.isStatic(createMethod.getModifiers())
+                            && AwsCredentialsProvider.class.isAssignableFrom(
+                                    createMethod.getReturnType())) {
+                        return (AwsCredentialsProvider) 
createMethod.invoke(null);
+                    }
+                } catch (NoSuchMethodException ignored) {
+                }
+
+                return (AwsCredentialsProvider) 
clazz.getDeclaredConstructor().newInstance();
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(
+                        "Credentials provider class not found: "
+                                + resolvedClassName
+                                + " (original: "
+                                + className
+                                + ")",
+                        e);
+            } catch (IllegalArgumentException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        "Failed to instantiate credentials provider: " + 
resolvedClassName, e);
+            }
+        }
+
+        private static final java.util.Map<String, String> 
HADOOP_TO_SDK_V2_PROVIDERS =
+                java.util.Map.of(
+                        "com.amazonaws.auth.AnonymousAWSCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider",
+                        
"org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider",
+                        
"com.amazonaws.auth.EnvironmentVariableCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider",
+                        
"com.amazonaws.auth.InstanceProfileCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider",
+                        
"com.amazonaws.auth.profile.ProfileCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider",
+                        "com.amazonaws.auth.ContainerCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider");

Review Comment:
   Since the whole point of this connector is to eliminate Hadoop I wouldn't 
add such code. It will stay forever.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -372,14 +384,103 @@ public S3ClientProvider build() {
         }
 
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
-            // The token provider returns null if credentials are not 
available,
-            // allowing the chain to proceed to the next provider (fallback).
+            if (credentialsProviderClasses != null
+                    && !credentialsProviderClasses.trim().isEmpty()) {
+                return 
buildCustomCredentialsProvider(credentialsProviderClasses);
+            }
+            // Default chain:
+            // 1. delegation tokens
+            // 2. static credentials (if configured)
+            // 3. DefaultCredentialsProvider.
             return AwsCredentialsProviderChain.builder()
                     .credentialsProviders(
                             new DynamicTemporaryAWSCredentialsProvider(), 
buildFallbackProvider())
                     .build();
         }

Review Comment:
   Why not creating a chain always and then the SDK authenticates with the 
provider which first has something? I mean if I would be a user I just want to 
have providers in the chain what I've configured. Here all will be surprised 
that tokens are not working just because somebody accidentally added a custom 
provider which does nothing.



##########
flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactoryTest.java:
##########
@@ -417,6 +417,96 @@ void testS3AInheritsAllS3Configuration() throws Exception {
         assertThat(fs).isInstanceOf(NativeS3FileSystem.class);
     }
 
+    @Test
+    void testCreateFileSystemWithAnonymousCredentialsProvider() throws 
Exception {
+        NativeS3FileSystemFactory factory = new NativeS3FileSystemFactory();
+        Configuration config = new Configuration();
+        config.setString(
+                "fs.s3.aws.credentials.provider",
+                
"software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider");
+        config.setString("s3.region", "us-east-1");
+        config.setString("io.tmp.dirs", System.getProperty("java.io.tmpdir"));
+
+        factory.configure(config);
+
+        URI fsUri = URI.create("s3://test-bucket/");
+        FileSystem fs = factory.create(fsUri);
+
+        assertThat(fs).isNotNull();
+        assertThat(fs).isInstanceOf(NativeS3FileSystem.class);

Review Comment:
   These kind of tests are not super extensive because if we would delete 
`fs.s3.aws.credentials.provider` config these will also pass.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -372,14 +382,129 @@ public S3ClientProvider build() {
         }
 
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
-            // The token provider returns null if credentials are not 
available,
-            // allowing the chain to proceed to the next provider (fallback).
+            if (credentialsProviderClasses != null
+                    && !credentialsProviderClasses.trim().isEmpty()) {
+                return 
buildCustomCredentialsProvider(credentialsProviderClasses);
+            }
+            // Default chain: delegation tokens -> static credentials (if 
configured)
+            // -> DefaultCredentialsProvider.
             return AwsCredentialsProviderChain.builder()
                     .credentialsProviders(
                             new DynamicTemporaryAWSCredentialsProvider(), 
buildFallbackProvider())
                     .build();
         }
 
+        private AwsCredentialsProvider buildCustomCredentialsProvider(String 
classNames) {
+            String[] names = classNames.split(",");
+            List<AwsCredentialsProvider> providers = new ArrayList<>();
+            for (String name : names) {
+                String trimmed = name.trim();
+                if (trimmed.isEmpty()) {
+                    continue;
+                }
+                providers.add(instantiateCredentialsProvider(trimmed));
+            }
+            if (providers.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "fs.s3.aws.credentials.provider is set but contains no 
valid provider class names");
+            }
+            LOG.info(
+                    "Using custom credentials provider chain with {} 
provider(s): {}",
+                    providers.size(),
+                    classNames);
+            if (providers.size() == 1) {
+                return providers.get(0);
+            }
+            return 
AwsCredentialsProviderChain.builder().credentialsProviders(providers).build();
+        }
+
+        /**
+         * Instantiates an {@link AwsCredentialsProvider} from a class name.
+         *
+         * <p>Supports:
+         *
+         * <ul>
+         *   <li>Fully-qualified AWS SDK v2 class names (e.g. {@code
+         *       
software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider})
+         *   <li>Simple class names resolved from the {@code
+         *       software.amazon.awssdk.auth.credentials} package (e.g. {@code
+         *       AnonymousCredentialsProvider})
+         *   <li>Well-known Hadoop / AWS SDK v1 names mapped to their SDK v2 
equivalents
+         * </ul>
+         */
+        private AwsCredentialsProvider instantiateCredentialsProvider(String 
className) {
+            String resolvedClassName = resolveProviderClassName(className);
+            try {
+                Class<?> clazz = Class.forName(resolvedClassName);
+                if (!AwsCredentialsProvider.class.isAssignableFrom(clazz)) {
+                    throw new IllegalArgumentException(
+                            "Class "
+                                    + resolvedClassName
+                                    + " does not implement 
AwsCredentialsProvider");
+                }
+
+                // SDK v2 providers use a static create() factory method by 
convention
+                try {
+                    java.lang.reflect.Method createMethod = 
clazz.getMethod("create");
+                    if 
(java.lang.reflect.Modifier.isStatic(createMethod.getModifiers())
+                            && AwsCredentialsProvider.class.isAssignableFrom(
+                                    createMethod.getReturnType())) {
+                        return (AwsCredentialsProvider) 
createMethod.invoke(null);
+                    }
+                } catch (NoSuchMethodException ignored) {
+                }
+
+                return (AwsCredentialsProvider) 
clazz.getDeclaredConstructor().newInstance();
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(
+                        "Credentials provider class not found: "
+                                + resolvedClassName
+                                + " (original: "
+                                + className
+                                + ")",
+                        e);
+            } catch (IllegalArgumentException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        "Failed to instantiate credentials provider: " + 
resolvedClassName, e);
+            }

Review Comment:
   I'm puzzled why we need 3 different behavior where in one case we're not 
giving extra info.



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -372,14 +382,129 @@ public S3ClientProvider build() {
         }
 
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
-            // The token provider returns null if credentials are not 
available,
-            // allowing the chain to proceed to the next provider (fallback).
+            if (credentialsProviderClasses != null
+                    && !credentialsProviderClasses.trim().isEmpty()) {
+                return 
buildCustomCredentialsProvider(credentialsProviderClasses);
+            }
+            // Default chain: delegation tokens -> static credentials (if 
configured)
+            // -> DefaultCredentialsProvider.
             return AwsCredentialsProviderChain.builder()
                     .credentialsProviders(
                             new DynamicTemporaryAWSCredentialsProvider(), 
buildFallbackProvider())
                     .build();
         }
 
+        private AwsCredentialsProvider buildCustomCredentialsProvider(String 
classNames) {
+            String[] names = classNames.split(",");
+            List<AwsCredentialsProvider> providers = new ArrayList<>();
+            for (String name : names) {
+                String trimmed = name.trim();
+                if (trimmed.isEmpty()) {
+                    continue;
+                }
+                providers.add(instantiateCredentialsProvider(trimmed));
+            }
+            if (providers.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "fs.s3.aws.credentials.provider is set but contains no 
valid provider class names");
+            }
+            LOG.info(
+                    "Using custom credentials provider chain with {} 
provider(s): {}",
+                    providers.size(),
+                    classNames);
+            if (providers.size() == 1) {
+                return providers.get(0);
+            }
+            return 
AwsCredentialsProviderChain.builder().credentialsProviders(providers).build();
+        }
+
+        /**
+         * Instantiates an {@link AwsCredentialsProvider} from a class name.
+         *
+         * <p>Supports:
+         *
+         * <ul>
+         *   <li>Fully-qualified AWS SDK v2 class names (e.g. {@code
+         *       
software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider})
+         *   <li>Simple class names resolved from the {@code
+         *       software.amazon.awssdk.auth.credentials} package (e.g. {@code
+         *       AnonymousCredentialsProvider})
+         *   <li>Well-known Hadoop / AWS SDK v1 names mapped to their SDK v2 
equivalents
+         * </ul>
+         */
+        private AwsCredentialsProvider instantiateCredentialsProvider(String 
className) {
+            String resolvedClassName = resolveProviderClassName(className);
+            try {
+                Class<?> clazz = Class.forName(resolvedClassName);
+                if (!AwsCredentialsProvider.class.isAssignableFrom(clazz)) {
+                    throw new IllegalArgumentException(
+                            "Class "
+                                    + resolvedClassName
+                                    + " does not implement 
AwsCredentialsProvider");
+                }
+
+                // SDK v2 providers use a static create() factory method by 
convention
+                try {
+                    java.lang.reflect.Method createMethod = 
clazz.getMethod("create");
+                    if 
(java.lang.reflect.Modifier.isStatic(createMethod.getModifiers())
+                            && AwsCredentialsProvider.class.isAssignableFrom(
+                                    createMethod.getReturnType())) {
+                        return (AwsCredentialsProvider) 
createMethod.invoke(null);
+                    }
+                } catch (NoSuchMethodException ignored) {

Review Comment:
   > // SDK v2 providers use a static create() factory method by convention
   
   Then why do we ignore `NoSuchMethodException`?



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -372,14 +382,129 @@ public S3ClientProvider build() {
         }
 
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
-            // The token provider returns null if credentials are not 
available,
-            // allowing the chain to proceed to the next provider (fallback).
+            if (credentialsProviderClasses != null
+                    && !credentialsProviderClasses.trim().isEmpty()) {
+                return 
buildCustomCredentialsProvider(credentialsProviderClasses);
+            }
+            // Default chain: delegation tokens -> static credentials (if 
configured)
+            // -> DefaultCredentialsProvider.
             return AwsCredentialsProviderChain.builder()
                     .credentialsProviders(
                             new DynamicTemporaryAWSCredentialsProvider(), 
buildFallbackProvider())
                     .build();
         }
 
+        private AwsCredentialsProvider buildCustomCredentialsProvider(String 
classNames) {
+            String[] names = classNames.split(",");
+            List<AwsCredentialsProvider> providers = new ArrayList<>();
+            for (String name : names) {
+                String trimmed = name.trim();
+                if (trimmed.isEmpty()) {
+                    continue;
+                }
+                providers.add(instantiateCredentialsProvider(trimmed));
+            }
+            if (providers.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "fs.s3.aws.credentials.provider is set but contains no 
valid provider class names");
+            }
+            LOG.info(
+                    "Using custom credentials provider chain with {} 
provider(s): {}",
+                    providers.size(),
+                    classNames);
+            if (providers.size() == 1) {
+                return providers.get(0);
+            }
+            return 
AwsCredentialsProviderChain.builder().credentialsProviders(providers).build();
+        }
+
+        /**
+         * Instantiates an {@link AwsCredentialsProvider} from a class name.
+         *
+         * <p>Supports:
+         *
+         * <ul>
+         *   <li>Fully-qualified AWS SDK v2 class names (e.g. {@code
+         *       
software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider})
+         *   <li>Simple class names resolved from the {@code
+         *       software.amazon.awssdk.auth.credentials} package (e.g. {@code
+         *       AnonymousCredentialsProvider})
+         *   <li>Well-known Hadoop / AWS SDK v1 names mapped to their SDK v2 
equivalents
+         * </ul>
+         */
+        private AwsCredentialsProvider instantiateCredentialsProvider(String 
className) {
+            String resolvedClassName = resolveProviderClassName(className);
+            try {
+                Class<?> clazz = Class.forName(resolvedClassName);
+                if (!AwsCredentialsProvider.class.isAssignableFrom(clazz)) {
+                    throw new IllegalArgumentException(
+                            "Class "
+                                    + resolvedClassName
+                                    + " does not implement 
AwsCredentialsProvider");
+                }
+
+                // SDK v2 providers use a static create() factory method by 
convention
+                try {
+                    java.lang.reflect.Method createMethod = 
clazz.getMethod("create");
+                    if 
(java.lang.reflect.Modifier.isStatic(createMethod.getModifiers())

Review Comment:
   Plz import



##########
flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProvider.java:
##########
@@ -372,14 +382,129 @@ public S3ClientProvider build() {
         }
 
         private AwsCredentialsProvider buildBaseCredentialsProvider() {
-            // The token provider returns null if credentials are not 
available,
-            // allowing the chain to proceed to the next provider (fallback).
+            if (credentialsProviderClasses != null
+                    && !credentialsProviderClasses.trim().isEmpty()) {
+                return 
buildCustomCredentialsProvider(credentialsProviderClasses);
+            }
+            // Default chain: delegation tokens -> static credentials (if 
configured)
+            // -> DefaultCredentialsProvider.
             return AwsCredentialsProviderChain.builder()
                     .credentialsProviders(
                             new DynamicTemporaryAWSCredentialsProvider(), 
buildFallbackProvider())
                     .build();
         }
 
+        private AwsCredentialsProvider buildCustomCredentialsProvider(String 
classNames) {
+            String[] names = classNames.split(",");
+            List<AwsCredentialsProvider> providers = new ArrayList<>();
+            for (String name : names) {
+                String trimmed = name.trim();
+                if (trimmed.isEmpty()) {
+                    continue;
+                }
+                providers.add(instantiateCredentialsProvider(trimmed));
+            }
+            if (providers.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "fs.s3.aws.credentials.provider is set but contains no 
valid provider class names");
+            }
+            LOG.info(
+                    "Using custom credentials provider chain with {} 
provider(s): {}",
+                    providers.size(),
+                    classNames);
+            if (providers.size() == 1) {
+                return providers.get(0);
+            }
+            return 
AwsCredentialsProviderChain.builder().credentialsProviders(providers).build();
+        }
+
+        /**
+         * Instantiates an {@link AwsCredentialsProvider} from a class name.
+         *
+         * <p>Supports:
+         *
+         * <ul>
+         *   <li>Fully-qualified AWS SDK v2 class names (e.g. {@code
+         *       
software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider})
+         *   <li>Simple class names resolved from the {@code
+         *       software.amazon.awssdk.auth.credentials} package (e.g. {@code
+         *       AnonymousCredentialsProvider})
+         *   <li>Well-known Hadoop / AWS SDK v1 names mapped to their SDK v2 
equivalents
+         * </ul>
+         */
+        private AwsCredentialsProvider instantiateCredentialsProvider(String 
className) {
+            String resolvedClassName = resolveProviderClassName(className);
+            try {
+                Class<?> clazz = Class.forName(resolvedClassName);
+                if (!AwsCredentialsProvider.class.isAssignableFrom(clazz)) {
+                    throw new IllegalArgumentException(
+                            "Class "
+                                    + resolvedClassName
+                                    + " does not implement 
AwsCredentialsProvider");
+                }
+
+                // SDK v2 providers use a static create() factory method by 
convention
+                try {
+                    java.lang.reflect.Method createMethod = 
clazz.getMethod("create");
+                    if 
(java.lang.reflect.Modifier.isStatic(createMethod.getModifiers())
+                            && AwsCredentialsProvider.class.isAssignableFrom(
+                                    createMethod.getReturnType())) {
+                        return (AwsCredentialsProvider) 
createMethod.invoke(null);
+                    }
+                } catch (NoSuchMethodException ignored) {
+                }
+
+                return (AwsCredentialsProvider) 
clazz.getDeclaredConstructor().newInstance();
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(
+                        "Credentials provider class not found: "
+                                + resolvedClassName
+                                + " (original: "
+                                + className
+                                + ")",
+                        e);
+            } catch (IllegalArgumentException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new IllegalArgumentException(
+                        "Failed to instantiate credentials provider: " + 
resolvedClassName, e);
+            }
+        }
+
+        private static final java.util.Map<String, String> 
HADOOP_TO_SDK_V2_PROVIDERS =
+                java.util.Map.of(
+                        "com.amazonaws.auth.AnonymousAWSCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider",
+                        
"org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider",
+                        
"com.amazonaws.auth.EnvironmentVariableCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider",
+                        
"com.amazonaws.auth.InstanceProfileCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider",
+                        
"com.amazonaws.auth.profile.ProfileCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider",
+                        "com.amazonaws.auth.ContainerCredentialsProvider",
+                        
"software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider");

Review Comment:
   I was just thinking in the meantime and another reason from my side is that 
those providers now match but there is no guarantee that it stays like that. 
I'm pretty sure nobody will track this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to