This is an automated email from the ASF dual-hosted git repository.

mmack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ec1945ec5c Support usage of custom profileName with AWS 
ProfileCredentialsProvider (closes #23206) (#23553)
1ec1945ec5c is described below

commit 1ec1945ec5c1d29dbc5efe574712733922ced07d
Author: Moritz Mack <mm...@talend.com>
AuthorDate: Mon Oct 24 09:19:14 2022 +0200

    Support usage of custom profileName with AWS ProfileCredentialsProvider 
(closes #23206) (#23553)
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |   1 +
 sdks/java/io/amazon-web-services2/build.gradle     |   1 +
 .../apache/beam/sdk/io/aws2/options/AwsModule.java |  25 +++-
 .../beam/sdk/io/aws2/options/AwsOptions.java       |  29 +++-
 .../beam/sdk/io/aws2/options/AwsModuleTest.java    | 158 +++++++++++++++++++--
 .../sdk/io/aws2/options/SerializationTestUtil.java |  15 +-
 6 files changed, 209 insertions(+), 20 deletions(-)

diff --git 
a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy 
b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index 99700b097bc..44c1011a7cf 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -546,6 +546,7 @@ class BeamModulePlugin implements Plugin<Project> {
         aws_java_sdk2_http_client_spi               : 
"software.amazon.awssdk:http-client-spi:$aws_java_sdk2_version",
         aws_java_sdk2_regions                       : 
"software.amazon.awssdk:regions:$aws_java_sdk2_version",
         aws_java_sdk2_utils                         : 
"software.amazon.awssdk:utils:$aws_java_sdk2_version",
+        aws_java_sdk2_profiles                      : 
"software.amazon.awssdk:profiles:$aws_java_sdk2_version",
         bigdataoss_gcsio                            : 
"com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version",
         bigdataoss_util                             : 
"com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version",
         byte_buddy                                  : 
"net.bytebuddy:byte-buddy:1.12.14",
diff --git a/sdks/java/io/amazon-web-services2/build.gradle 
b/sdks/java/io/amazon-web-services2/build.gradle
index 1c5d3dc8268..5b25cde8f0e 100644
--- a/sdks/java/io/amazon-web-services2/build.gradle
+++ b/sdks/java/io/amazon-web-services2/build.gradle
@@ -48,6 +48,7 @@ dependencies {
   implementation library.java.aws_java_sdk2_auth, excludeNetty
   implementation library.java.aws_java_sdk2_regions, excludeNetty
   implementation library.java.aws_java_sdk2_utils, excludeNetty
+  implementation library.java.aws_java_sdk2_profiles, excludeNetty
   implementation library.java.aws_java_sdk2_http_client_spi, excludeNetty
   implementation library.java.aws_java_sdk2_apache_client, excludeNetty
   implementation library.java.aws_java_sdk2_netty_client, excludeNetty
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
index 0f8b138d0b9..d814b395950 100644
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java
@@ -49,6 +49,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.checkerframework.checker.nullness.qual.NonNull;
+import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -60,6 +61,7 @@ import 
software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
 import 
software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider;
 import software.amazon.awssdk.http.apache.ProxyConfiguration;
+import software.amazon.awssdk.profiles.ProfileFileSystemSetting;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.sts.StsClient;
 import 
software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
@@ -75,6 +77,7 @@ public class AwsModule extends SimpleModule {
   private static final String ACCESS_KEY_ID = "accessKeyId";
   private static final String SECRET_ACCESS_KEY = "secretAccessKey";
   private static final String SESSION_TOKEN = "sessionToken";
+  private static final String PROFILE_NAME = "profileName";
 
   public AwsModule() {
     super("AwsModule");
@@ -160,7 +163,9 @@ public class AwsModule extends SimpleModule {
       } else if (hasName(SystemPropertyCredentialsProvider.class, typeName)) {
         return SystemPropertyCredentialsProvider.create();
       } else if (hasName(ProfileCredentialsProvider.class, typeName)) {
-        return ProfileCredentialsProvider.create();
+        return json.has(PROFILE_NAME)
+            ? ProfileCredentialsProvider.create(getNotNull(json, PROFILE_NAME, 
typeName))
+            : ProfileCredentialsProvider.create();
       } else if (hasName(ContainerCredentialsProvider.class, typeName)) {
         return ContainerCredentialsProvider.builder().build();
       } else if 
(typeName.equals(StsAssumeRoleCredentialsProvider.class.getSimpleName())) {
@@ -195,7 +200,6 @@ public class AwsModule extends SimpleModule {
             DefaultCredentialsProvider.class,
             EnvironmentVariableCredentialsProvider.class,
             SystemPropertyCredentialsProvider.class,
-            ProfileCredentialsProvider.class,
             ContainerCredentialsProvider.class);
 
     @Override
@@ -228,6 +232,23 @@ public class AwsModule extends SimpleModule {
           jsonGenerator.writeStringField(ACCESS_KEY_ID, 
credentials.accessKeyId());
           jsonGenerator.writeStringField(SECRET_ACCESS_KEY, 
credentials.secretAccessKey());
         }
+      } else if (providerClass.equals(ProfileCredentialsProvider.class)) {
+        String profileName = (String) readField(credentialsProvider, 
PROFILE_NAME);
+        String envProfileName = 
ProfileFileSystemSetting.AWS_PROFILE.getStringValueOrThrow();
+        if (profileName != null && !profileName.equals(envProfileName)) {
+          jsonGenerator.writeStringField(PROFILE_NAME, profileName);
+        }
+        try {
+          Exception exception = (Exception) readField(credentialsProvider, 
"loadException");
+          if (exception != null) {
+            LoggerFactory.getLogger(AwsModule.class)
+                .warn("Serialized ProfileCredentialsProvider in faulty 
state.", exception);
+          }
+        } catch (RuntimeException e) {
+          LoggerFactory.getLogger(AwsModule.class)
+              .warn("Failed to check ProfileCredentialsProvider for 
loadException.", e);
+        }
+
       } else if (providerClass.equals(StsAssumeRoleCredentialsProvider.class)) 
{
         Supplier<AssumeRoleRequest> reqSupplier =
             (Supplier<AssumeRoleRequest>)
diff --git 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java
 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java
index d2e02217d15..ae86c27b78e 100644
--- 
a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java
+++ 
b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java
@@ -78,21 +78,20 @@ public interface AwsOptions extends PipelineOptions {
    * <p>The class name of the provider must be set in the {@code @type} field. 
Note: Not all
    * available providers are supported and some configuration options might be 
ignored.
    *
-   * <p>Most providers rely on system's environment to follow AWS conventions, 
there's no further
-   * configuration:
+   * <p>Most providers must use the system environment following AWS 
conventions. Programmatic
+   * configuration for these providers is NOT supported:
    * <li>{@link DefaultCredentialsProvider}
    * <li>{@link EnvironmentVariableCredentialsProvider}
    * <li>{@link SystemPropertyCredentialsProvider}
-   * <li>{@link ProfileCredentialsProvider}
    * <li>{@link ContainerCredentialsProvider}
    *
    *     <p>Example:
    *
-   *     <pre>{@code --awsCredentialsProvider={"@type": 
"ProfileCredentialsProvider"}}</pre>
+   *     <pre>{@code --awsCredentialsProvider={"@type": 
"EnvironmentVariableCredentialsProvider"}}
+   *     </pre>
    *
-   *     <p>Some other providers require additional configuration:
+   *     <p>Some other providers support additional configuration:
    * <li>{@link StaticCredentialsProvider}
-   * <li>{@link StsAssumeRoleCredentialsProvider}
    *
    *     <p>Examples:
    *
@@ -107,9 +106,27 @@ public interface AwsOptions extends PipelineOptions {
    *   "awsAccessKeyId": "key_id_value",
    *   "awsSecretKey": "secret_value",
    *   "sessionToken": "token_value"
+   * }}</pre>
+   *
+   * <li>{@link ProfileCredentialsProvider}
+   *
+   *     <p>{@code profileName} is optional, if not set the environment 
default is used. Be careful
+   *     if using this provider programmatically, it can behave unexpectedly.
+   *
+   *     <p>Examples:
+   *
+   *     <pre>{@code --awsCredentialsProvider={
+   *   "@type": "ProfileCredentialsProvider"
    * }
    *
    * --awsCredentialsProvider={
+   *   "@type": "ProfileCredentialsProvider",
+   *   "profileName": "my_profile"
+   * }}</pre>
+   *
+   * <li>{@link StsAssumeRoleCredentialsProvider}
+   *
+   *     <pre>{@code --awsCredentialsProvider={
    *   "@type": "StsAssumeRoleCredentialsProvider",
    *   "roleArn": "role_arn_Value",
    *   "roleSessionName": "session_name_value",
diff --git 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/AwsModuleTest.java
 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/AwsModuleTest.java
index e5962812e64..17e6f528f96 100644
--- 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/AwsModuleTest.java
+++ 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/AwsModuleTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.aws2.options;
 
 import static 
org.apache.beam.repackaged.core.org.apache.commons.lang3.reflect.FieldUtils.readField;
+import static 
org.apache.beam.sdk.io.aws2.options.SerializationTestUtil.serialize;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
@@ -25,21 +26,32 @@ import static org.junit.Assert.assertEquals;
 import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ACCESS_KEY_ID;
 import static software.amazon.awssdk.core.SdkSystemSetting.AWS_REGION;
 import static 
software.amazon.awssdk.core.SdkSystemSetting.AWS_SECRET_ACCESS_KEY;
+import static 
software.amazon.awssdk.profiles.ProfileFileSystemSetting.AWS_CONFIG_FILE;
+import static 
software.amazon.awssdk.profiles.ProfileFileSystemSetting.AWS_PROFILE;
 
 import com.amazonaws.regions.Regions;
 import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
 import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.function.Supplier;
+import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.util.ThrowingSupplier;
 import org.apache.beam.sdk.util.common.ReflectHelpers;
 import org.hamcrest.MatcherAssert;
+import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExternalResource;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentials;
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
 import software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider;
@@ -57,6 +69,24 @@ import 
software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
 @RunWith(JUnit4.class)
 public class AwsModuleTest {
 
+  @ClassRule
+  public static final ProfileFile PROFILE =
+      new ProfileFile(
+          "[default]",
+          "aws_access_key_id=defaultkey",
+          "aws_secret_access_key=123",
+          "[profile other]",
+          "aws_access_key_id=otherkey",
+          "aws_secret_access_key=abc");
+
+  private static final AwsCredentials DEFAULT_CREDENTIALS =
+      AwsBasicCredentials.create("defaultkey", "123");
+
+  private static final AwsCredentials OTHER_CREDENTIALS =
+      AwsBasicCredentials.create("otherkey", "abc");
+
+  @Rule public final ExpectedLogs logs = ExpectedLogs.none(AwsModule.class);
+
   @Test
   public void testObjectMapperIsAbleToFindModule() {
     List<Module> modules = 
ObjectMapper.findModules(ReflectHelpers.findClassLoader());
@@ -68,7 +98,7 @@ public class AwsModuleTest {
   }
 
   @Test
-  public void testStaticCredentialsProviderSerializationDeserialization() {
+  public void testStaticCredentialsProviderSerDe() {
     AwsCredentialsProvider provider =
         StaticCredentialsProvider.create(AwsBasicCredentials.create("key", 
"secret"));
 
@@ -84,7 +114,7 @@ public class AwsModuleTest {
   }
 
   @Test
-  public void testAwsCredentialsProviderSerializationDeserialization() {
+  public void testAwsCredentialsProviderSerDe() {
     AwsCredentialsProvider provider = DefaultCredentialsProvider.create();
     AwsCredentialsProvider deserializedProvider = 
serializeAndDeserialize(provider);
     assertEquals(provider.getClass(), deserializedProvider.getClass());
@@ -97,17 +127,90 @@ public class AwsModuleTest {
     deserializedProvider = serializeAndDeserialize(provider);
     assertEquals(provider.getClass(), deserializedProvider.getClass());
 
-    provider = ProfileCredentialsProvider.create();
-    deserializedProvider = serializeAndDeserialize(provider);
-    assertEquals(provider.getClass(), deserializedProvider.getClass());
-
     provider = ContainerCredentialsProvider.builder().build();
     deserializedProvider = serializeAndDeserialize(provider);
     assertEquals(provider.getClass(), deserializedProvider.getClass());
   }
 
   @Test
-  public void 
testStsAssumeRoleCredentialsProviderSerializationDeserialization() throws 
Exception {
+  public void testProfileCredentialsProviderSerDeWithDefaultProfile() throws 
Exception {
+    withSystemProperties(
+        PROFILE.properties("default"),
+        () -> {
+          AwsCredentialsProvider provider = 
ProfileCredentialsProvider.create();
+          String serializedProvider = serialize(provider);
+
+          
assertThat(serializedProvider).isEqualTo("{\"@type\":\"ProfileCredentialsProvider\"}");
+
+          AwsCredentialsProvider actual = deserialize(serializedProvider);
+          assertThat(actual.resolveCredentials())
+              .isEqualToComparingFieldByField(DEFAULT_CREDENTIALS);
+          return assertThat(actual)
+              .isExactlyInstanceOf(ProfileCredentialsProvider.class)
+              .isEqualToComparingFieldByFieldRecursively(provider);
+        });
+  }
+
+  @Test
+  public void testProfileCredentialsProviderSerDeWithCustomProfile() throws 
Exception {
+    withSystemProperties(
+        PROFILE.properties("default"),
+        () -> {
+          AwsCredentialsProvider provider = 
ProfileCredentialsProvider.create("other");
+          String serializedProvider = serialize(provider);
+
+          assertThat(serializedProvider)
+              
.isEqualTo("{\"@type\":\"ProfileCredentialsProvider\",\"profileName\":\"other\"}");
+
+          AwsCredentialsProvider actual = deserialize(serializedProvider);
+          
assertThat(actual.resolveCredentials()).isEqualToComparingFieldByField(OTHER_CREDENTIALS);
+          return assertThat(actual)
+              .isExactlyInstanceOf(ProfileCredentialsProvider.class)
+              .isEqualToComparingFieldByFieldRecursively(provider);
+        });
+  }
+
+  @Test
+  public void testProfileCredentialsProviderSerDeWithCustomDefaultProfile() 
throws Exception {
+    withSystemProperties(
+        PROFILE.properties("other"),
+        () -> {
+          AwsCredentialsProvider provider = 
ProfileCredentialsProvider.create("other");
+          String serializedProvider = serialize(provider);
+
+          
assertThat(serializedProvider).isEqualTo("{\"@type\":\"ProfileCredentialsProvider\"}");
+
+          AwsCredentialsProvider actual = deserialize(serializedProvider);
+          assertThat(actual.resolveCredentials())
+              .isEqualToComparingFieldByFieldRecursively(OTHER_CREDENTIALS);
+          return assertThat(actual)
+              .isExactlyInstanceOf(ProfileCredentialsProvider.class)
+              .isEqualToComparingFieldByFieldRecursively(provider);
+        });
+  }
+
+  @Test
+  public void testProfileCredentialsProviderSerDeWithUnknownProfile() throws 
Exception {
+    withSystemProperties(
+        PROFILE.properties("default"),
+        () -> {
+          AwsCredentialsProvider provider = 
ProfileCredentialsProvider.create("unknown");
+          String serializedProvider = serialize(provider);
+
+          // ProfileCredentialsProvider SILENTLY drops unknown profiles
+          
assertThat(serializedProvider).isEqualTo("{\"@type\":\"ProfileCredentialsProvider\"}");
+
+          AwsCredentialsProvider actual = deserialize(serializedProvider);
+          // NOTE: This documents the unexpected behavior in case a faulty 
provider is serialized
+          return assertThat(actual.resolveCredentials())
+              .isEqualToComparingFieldByField(DEFAULT_CREDENTIALS);
+        });
+
+    logs.verifyWarn("Serialized ProfileCredentialsProvider in faulty state.");
+  }
+
+  @Test
+  public void testStsAssumeRoleCredentialsProviderSerDe() throws Exception {
     AssumeRoleRequest req = 
AssumeRoleRequest.builder().roleArn("roleArn").policy("policy").build();
     Supplier<AwsCredentialsProvider> provider =
         () ->
@@ -123,7 +226,7 @@ public class AwsModuleTest {
 
     // Region and credentials for STS client are resolved using default 
providers
     AwsCredentialsProvider deserializedProvider =
-        withSystemPropertyOverrides(overrides, () -> 
serializeAndDeserialize(provider.get()));
+        withSystemProperties(overrides, () -> 
serializeAndDeserialize(provider.get()));
 
     Supplier<AssumeRoleRequest> requestSupplier =
         (Supplier<AssumeRoleRequest>)
@@ -132,7 +235,7 @@ public class AwsModuleTest {
   }
 
   @Test
-  public void testProxyConfigurationSerializationDeserialization() {
+  public void testProxyConfigurationSerDe() {
     ProxyConfiguration proxyConfiguration =
         ProxyConfiguration.builder()
             .endpoint(URI.create("http://localhost:8080";))
@@ -147,7 +250,7 @@ public class AwsModuleTest {
     assertEquals("password", deserializedProxyConfiguration.password());
   }
 
-  private <T> T withSystemPropertyOverrides(Properties overrides, 
ThrowingSupplier<T> fun)
+  private <T> T withSystemProperties(Properties overrides, ThrowingSupplier<T> 
fun)
       throws Exception {
     Properties systemProps = System.getProperties();
 
@@ -164,4 +267,39 @@ public class AwsModuleTest {
       previousProps.forEach(systemProps::put);
     }
   }
+
+  private static AwsCredentialsProvider deserialize(String provider) {
+    return SerializationTestUtil.deserialize(provider, 
AwsCredentialsProvider.class);
+  }
+
+  static class ProfileFile extends ExternalResource {
+    private String[] lines;
+    private Path path;
+
+    public ProfileFile(String... lines) {
+      this.lines = lines;
+    }
+
+    public Properties properties(String defaultProfile) {
+      Properties props = new Properties();
+      props.setProperty(AWS_CONFIG_FILE.property(), path.toString());
+      props.setProperty(AWS_PROFILE.property(), defaultProfile);
+      return props;
+    }
+
+    @Override
+    protected void before() throws Throwable {
+      path = Files.createTempFile("profile", ".conf");
+      Files.write(path, Arrays.asList(lines));
+    }
+
+    @Override
+    protected void after() {
+      try {
+        Files.delete(path);
+      } catch (IOException e) {
+        // ignore
+      }
+    }
+  }
 }
diff --git 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java
 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java
index 0f5daf0bc92..6cf79c95809 100644
--- 
a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java
+++ 
b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java
@@ -28,11 +28,22 @@ public class SerializationTestUtil {
           
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
 
   public static <T> T serializeDeserialize(Class<T> clazz, T obj) {
+    return deserialize(serialize(obj), clazz);
+  }
+
+  public static <T> String serialize(T obj) {
+    try {
+      return MAPPER.writeValueAsString(obj);
+    } catch (JsonProcessingException e) {
+      throw new RuntimeException("Failed to serialize " + 
obj.getClass().getSimpleName(), e);
+    }
+  }
+
+  public static <T> T deserialize(String jsonString, Class<T> clazz) {
     try {
-      String jsonString = MAPPER.writeValueAsString(obj);
       return MAPPER.readValue(jsonString, clazz);
     } catch (JsonProcessingException e) {
-      throw new RuntimeException("Failed to serialize/deserialize " + 
clazz.getSimpleName(), e);
+      throw new RuntimeException("Failed to deserialize " + 
clazz.getSimpleName(), e);
     }
   }
 }

Reply via email to