This is an automated email from the ASF dual-hosted git repository. mmack pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 1ec1945ec5c Support usage of custom profileName with AWS ProfileCredentialsProvider (closes #23206) (#23553) 1ec1945ec5c is described below commit 1ec1945ec5c1d29dbc5efe574712733922ced07d Author: Moritz Mack <mm...@talend.com> AuthorDate: Mon Oct 24 09:19:14 2022 +0200 Support usage of custom profileName with AWS ProfileCredentialsProvider (closes #23206) (#23553) --- .../org/apache/beam/gradle/BeamModulePlugin.groovy | 1 + sdks/java/io/amazon-web-services2/build.gradle | 1 + .../apache/beam/sdk/io/aws2/options/AwsModule.java | 25 +++- .../beam/sdk/io/aws2/options/AwsOptions.java | 29 +++- .../beam/sdk/io/aws2/options/AwsModuleTest.java | 158 +++++++++++++++++++-- .../sdk/io/aws2/options/SerializationTestUtil.java | 15 +- 6 files changed, 209 insertions(+), 20 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 99700b097bc..44c1011a7cf 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -546,6 +546,7 @@ class BeamModulePlugin implements Plugin<Project> { aws_java_sdk2_http_client_spi : "software.amazon.awssdk:http-client-spi:$aws_java_sdk2_version", aws_java_sdk2_regions : "software.amazon.awssdk:regions:$aws_java_sdk2_version", aws_java_sdk2_utils : "software.amazon.awssdk:utils:$aws_java_sdk2_version", + aws_java_sdk2_profiles : "software.amazon.awssdk:profiles:$aws_java_sdk2_version", bigdataoss_gcsio : "com.google.cloud.bigdataoss:gcsio:$google_cloud_bigdataoss_version", bigdataoss_util : "com.google.cloud.bigdataoss:util:$google_cloud_bigdataoss_version", byte_buddy : "net.bytebuddy:byte-buddy:1.12.14", diff --git a/sdks/java/io/amazon-web-services2/build.gradle b/sdks/java/io/amazon-web-services2/build.gradle index 1c5d3dc8268..5b25cde8f0e 100644 --- a/sdks/java/io/amazon-web-services2/build.gradle +++ b/sdks/java/io/amazon-web-services2/build.gradle @@ -48,6 +48,7 @@ dependencies { implementation library.java.aws_java_sdk2_auth, excludeNetty implementation library.java.aws_java_sdk2_regions, excludeNetty implementation library.java.aws_java_sdk2_utils, excludeNetty + implementation library.java.aws_java_sdk2_profiles, excludeNetty implementation library.java.aws_java_sdk2_http_client_spi, excludeNetty implementation library.java.aws_java_sdk2_apache_client, excludeNetty implementation library.java.aws_java_sdk2_netty_client, excludeNetty diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java index 0f8b138d0b9..d814b395950 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsModule.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.NonNull; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; @@ -60,6 +61,7 @@ import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.auth.credentials.SystemPropertyCredentialsProvider; import software.amazon.awssdk.http.apache.ProxyConfiguration; +import software.amazon.awssdk.profiles.ProfileFileSystemSetting; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; @@ -75,6 +77,7 @@ public class AwsModule extends SimpleModule { private static final String ACCESS_KEY_ID = "accessKeyId"; private static final String SECRET_ACCESS_KEY = "secretAccessKey"; private static final String SESSION_TOKEN = "sessionToken"; + private static final String PROFILE_NAME = "profileName"; public AwsModule() { super("AwsModule"); @@ -160,7 +163,9 @@ public class AwsModule extends SimpleModule { } else if (hasName(SystemPropertyCredentialsProvider.class, typeName)) { return SystemPropertyCredentialsProvider.create(); } else if (hasName(ProfileCredentialsProvider.class, typeName)) { - return ProfileCredentialsProvider.create(); + return json.has(PROFILE_NAME) + ? ProfileCredentialsProvider.create(getNotNull(json, PROFILE_NAME, typeName)) + : ProfileCredentialsProvider.create(); } else if (hasName(ContainerCredentialsProvider.class, typeName)) { return ContainerCredentialsProvider.builder().build(); } else if (typeName.equals(StsAssumeRoleCredentialsProvider.class.getSimpleName())) { @@ -195,7 +200,6 @@ public class AwsModule extends SimpleModule { DefaultCredentialsProvider.class, EnvironmentVariableCredentialsProvider.class, SystemPropertyCredentialsProvider.class, - ProfileCredentialsProvider.class, ContainerCredentialsProvider.class); @Override @@ -228,6 +232,23 @@ public class AwsModule extends SimpleModule { jsonGenerator.writeStringField(ACCESS_KEY_ID, credentials.accessKeyId()); jsonGenerator.writeStringField(SECRET_ACCESS_KEY, credentials.secretAccessKey()); } + } else if (providerClass.equals(ProfileCredentialsProvider.class)) { + String profileName = (String) readField(credentialsProvider, PROFILE_NAME); + String envProfileName = ProfileFileSystemSetting.AWS_PROFILE.getStringValueOrThrow(); + if (profileName != null && !profileName.equals(envProfileName)) { + jsonGenerator.writeStringField(PROFILE_NAME, profileName); + } + try { + Exception exception = (Exception) readField(credentialsProvider, "loadException"); + if (exception != null) { + LoggerFactory.getLogger(AwsModule.class) + .warn("Serialized ProfileCredentialsProvider in faulty state.", exception); + } + } catch (RuntimeException e) { + LoggerFactory.getLogger(AwsModule.class) + .warn("Failed to check ProfileCredentialsProvider for loadException.", e); + } + } else if (providerClass.equals(StsAssumeRoleCredentialsProvider.class)) { Supplier<AssumeRoleRequest> reqSupplier = (Supplier<AssumeRoleRequest>) diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java index d2e02217d15..ae86c27b78e 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/options/AwsOptions.java @@ -78,21 +78,20 @@ public interface AwsOptions extends PipelineOptions { * <p>The class name of the provider must be set in the {@code @type} field. Note: Not all * available providers are supported and some configuration options might be ignored. * - * <p>Most providers rely on system's environment to follow AWS conventions, there's no further - * configuration: + * <p>Most providers must use the system environment following AWS conventions. Programmatic + * configuration for these providers is NOT supported: * <li>{@link DefaultCredentialsProvider} * <li>{@link EnvironmentVariableCredentialsProvider} * <li>{@link SystemPropertyCredentialsProvider} - * <li>{@link ProfileCredentialsProvider} * <li>{@link ContainerCredentialsProvider} * * <p>Example: * - * <pre>{@code --awsCredentialsProvider={"@type": "ProfileCredentialsProvider"}}</pre> + * <pre>{@code --awsCredentialsProvider={"@type": "EnvironmentVariableCredentialsProvider"}} + * </pre> * - * <p>Some other providers require additional configuration: + * <p>Some other providers support additional configuration: * <li>{@link StaticCredentialsProvider} - * <li>{@link StsAssumeRoleCredentialsProvider} * * <p>Examples: * @@ -107,9 +106,27 @@ public interface AwsOptions extends PipelineOptions { * "awsAccessKeyId": "key_id_value", * "awsSecretKey": "secret_value", * "sessionToken": "token_value" + * }}</pre> + * + * <li>{@link ProfileCredentialsProvider} + * + * <p>{@code profileName} is optional, if not set the environment default is used. Be careful + * if using this provider programmatically, it can behave unexpectedly. + * + * <p>Examples: + * + * <pre>{@code --awsCredentialsProvider={ + * "@type": "ProfileCredentialsProvider" * } * * --awsCredentialsProvider={ + * "@type": "ProfileCredentialsProvider", + * "profileName": "my_profile" + * }}</pre> + * + * <li>{@link StsAssumeRoleCredentialsProvider} + * + * <pre>{@code --awsCredentialsProvider={ * "@type": "StsAssumeRoleCredentialsProvider", * "roleArn": "role_arn_Value", * "roleSessionName": "session_name_value", diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/AwsModuleTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/AwsModuleTest.java index e5962812e64..17e6f528f96 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/AwsModuleTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/AwsModuleTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.aws2.options; import static org.apache.beam.repackaged.core.org.apache.commons.lang3.reflect.FieldUtils.readField; +import static org.apache.beam.sdk.io.aws2.options.SerializationTestUtil.serialize; import static org.assertj.core.api.Assertions.assertThat; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; @@ -25,21 +26,32 @@ import static org.junit.Assert.assertEquals; import static software.amazon.awssdk.core.SdkSystemSetting.AWS_ACCESS_KEY_ID; import static software.amazon.awssdk.core.SdkSystemSetting.AWS_REGION; import static software.amazon.awssdk.core.SdkSystemSetting.AWS_SECRET_ACCESS_KEY; +import static software.amazon.awssdk.profiles.ProfileFileSystemSetting.AWS_CONFIG_FILE; +import static software.amazon.awssdk.profiles.ProfileFileSystemSetting.AWS_PROFILE; import com.amazonaws.regions.Regions; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; import java.net.URI; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.function.Supplier; +import org.apache.beam.sdk.testing.ExpectedLogs; import org.apache.beam.sdk.util.ThrowingSupplier; import org.apache.beam.sdk.util.common.ReflectHelpers; import org.hamcrest.MatcherAssert; +import org.junit.ClassRule; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExternalResource; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.ContainerCredentialsProvider; @@ -57,6 +69,24 @@ import software.amazon.awssdk.services.sts.model.AssumeRoleRequest; @RunWith(JUnit4.class) public class AwsModuleTest { + @ClassRule + public static final ProfileFile PROFILE = + new ProfileFile( + "[default]", + "aws_access_key_id=defaultkey", + "aws_secret_access_key=123", + "[profile other]", + "aws_access_key_id=otherkey", + "aws_secret_access_key=abc"); + + private static final AwsCredentials DEFAULT_CREDENTIALS = + AwsBasicCredentials.create("defaultkey", "123"); + + private static final AwsCredentials OTHER_CREDENTIALS = + AwsBasicCredentials.create("otherkey", "abc"); + + @Rule public final ExpectedLogs logs = ExpectedLogs.none(AwsModule.class); + @Test public void testObjectMapperIsAbleToFindModule() { List<Module> modules = ObjectMapper.findModules(ReflectHelpers.findClassLoader()); @@ -68,7 +98,7 @@ public class AwsModuleTest { } @Test - public void testStaticCredentialsProviderSerializationDeserialization() { + public void testStaticCredentialsProviderSerDe() { AwsCredentialsProvider provider = StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret")); @@ -84,7 +114,7 @@ public class AwsModuleTest { } @Test - public void testAwsCredentialsProviderSerializationDeserialization() { + public void testAwsCredentialsProviderSerDe() { AwsCredentialsProvider provider = DefaultCredentialsProvider.create(); AwsCredentialsProvider deserializedProvider = serializeAndDeserialize(provider); assertEquals(provider.getClass(), deserializedProvider.getClass()); @@ -97,17 +127,90 @@ public class AwsModuleTest { deserializedProvider = serializeAndDeserialize(provider); assertEquals(provider.getClass(), deserializedProvider.getClass()); - provider = ProfileCredentialsProvider.create(); - deserializedProvider = serializeAndDeserialize(provider); - assertEquals(provider.getClass(), deserializedProvider.getClass()); - provider = ContainerCredentialsProvider.builder().build(); deserializedProvider = serializeAndDeserialize(provider); assertEquals(provider.getClass(), deserializedProvider.getClass()); } @Test - public void testStsAssumeRoleCredentialsProviderSerializationDeserialization() throws Exception { + public void testProfileCredentialsProviderSerDeWithDefaultProfile() throws Exception { + withSystemProperties( + PROFILE.properties("default"), + () -> { + AwsCredentialsProvider provider = ProfileCredentialsProvider.create(); + String serializedProvider = serialize(provider); + + assertThat(serializedProvider).isEqualTo("{\"@type\":\"ProfileCredentialsProvider\"}"); + + AwsCredentialsProvider actual = deserialize(serializedProvider); + assertThat(actual.resolveCredentials()) + .isEqualToComparingFieldByField(DEFAULT_CREDENTIALS); + return assertThat(actual) + .isExactlyInstanceOf(ProfileCredentialsProvider.class) + .isEqualToComparingFieldByFieldRecursively(provider); + }); + } + + @Test + public void testProfileCredentialsProviderSerDeWithCustomProfile() throws Exception { + withSystemProperties( + PROFILE.properties("default"), + () -> { + AwsCredentialsProvider provider = ProfileCredentialsProvider.create("other"); + String serializedProvider = serialize(provider); + + assertThat(serializedProvider) + .isEqualTo("{\"@type\":\"ProfileCredentialsProvider\",\"profileName\":\"other\"}"); + + AwsCredentialsProvider actual = deserialize(serializedProvider); + assertThat(actual.resolveCredentials()).isEqualToComparingFieldByField(OTHER_CREDENTIALS); + return assertThat(actual) + .isExactlyInstanceOf(ProfileCredentialsProvider.class) + .isEqualToComparingFieldByFieldRecursively(provider); + }); + } + + @Test + public void testProfileCredentialsProviderSerDeWithCustomDefaultProfile() throws Exception { + withSystemProperties( + PROFILE.properties("other"), + () -> { + AwsCredentialsProvider provider = ProfileCredentialsProvider.create("other"); + String serializedProvider = serialize(provider); + + assertThat(serializedProvider).isEqualTo("{\"@type\":\"ProfileCredentialsProvider\"}"); + + AwsCredentialsProvider actual = deserialize(serializedProvider); + assertThat(actual.resolveCredentials()) + .isEqualToComparingFieldByFieldRecursively(OTHER_CREDENTIALS); + return assertThat(actual) + .isExactlyInstanceOf(ProfileCredentialsProvider.class) + .isEqualToComparingFieldByFieldRecursively(provider); + }); + } + + @Test + public void testProfileCredentialsProviderSerDeWithUnknownProfile() throws Exception { + withSystemProperties( + PROFILE.properties("default"), + () -> { + AwsCredentialsProvider provider = ProfileCredentialsProvider.create("unknown"); + String serializedProvider = serialize(provider); + + // ProfileCredentialsProvider SILENTLY drops unknown profiles + assertThat(serializedProvider).isEqualTo("{\"@type\":\"ProfileCredentialsProvider\"}"); + + AwsCredentialsProvider actual = deserialize(serializedProvider); + // NOTE: This documents the unexpected behavior in case a faulty provider is serialized + return assertThat(actual.resolveCredentials()) + .isEqualToComparingFieldByField(DEFAULT_CREDENTIALS); + }); + + logs.verifyWarn("Serialized ProfileCredentialsProvider in faulty state."); + } + + @Test + public void testStsAssumeRoleCredentialsProviderSerDe() throws Exception { AssumeRoleRequest req = AssumeRoleRequest.builder().roleArn("roleArn").policy("policy").build(); Supplier<AwsCredentialsProvider> provider = () -> @@ -123,7 +226,7 @@ public class AwsModuleTest { // Region and credentials for STS client are resolved using default providers AwsCredentialsProvider deserializedProvider = - withSystemPropertyOverrides(overrides, () -> serializeAndDeserialize(provider.get())); + withSystemProperties(overrides, () -> serializeAndDeserialize(provider.get())); Supplier<AssumeRoleRequest> requestSupplier = (Supplier<AssumeRoleRequest>) @@ -132,7 +235,7 @@ public class AwsModuleTest { } @Test - public void testProxyConfigurationSerializationDeserialization() { + public void testProxyConfigurationSerDe() { ProxyConfiguration proxyConfiguration = ProxyConfiguration.builder() .endpoint(URI.create("http://localhost:8080")) @@ -147,7 +250,7 @@ public class AwsModuleTest { assertEquals("password", deserializedProxyConfiguration.password()); } - private <T> T withSystemPropertyOverrides(Properties overrides, ThrowingSupplier<T> fun) + private <T> T withSystemProperties(Properties overrides, ThrowingSupplier<T> fun) throws Exception { Properties systemProps = System.getProperties(); @@ -164,4 +267,39 @@ public class AwsModuleTest { previousProps.forEach(systemProps::put); } } + + private static AwsCredentialsProvider deserialize(String provider) { + return SerializationTestUtil.deserialize(provider, AwsCredentialsProvider.class); + } + + static class ProfileFile extends ExternalResource { + private String[] lines; + private Path path; + + public ProfileFile(String... lines) { + this.lines = lines; + } + + public Properties properties(String defaultProfile) { + Properties props = new Properties(); + props.setProperty(AWS_CONFIG_FILE.property(), path.toString()); + props.setProperty(AWS_PROFILE.property(), defaultProfile); + return props; + } + + @Override + protected void before() throws Throwable { + path = Files.createTempFile("profile", ".conf"); + Files.write(path, Arrays.asList(lines)); + } + + @Override + protected void after() { + try { + Files.delete(path); + } catch (IOException e) { + // ignore + } + } + } } diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java index 0f5daf0bc92..6cf79c95809 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/options/SerializationTestUtil.java @@ -28,11 +28,22 @@ public class SerializationTestUtil { .registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader())); public static <T> T serializeDeserialize(Class<T> clazz, T obj) { + return deserialize(serialize(obj), clazz); + } + + public static <T> String serialize(T obj) { + try { + return MAPPER.writeValueAsString(obj); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize " + obj.getClass().getSimpleName(), e); + } + } + + public static <T> T deserialize(String jsonString, Class<T> clazz) { try { - String jsonString = MAPPER.writeValueAsString(obj); return MAPPER.readValue(jsonString, clazz); } catch (JsonProcessingException e) { - throw new RuntimeException("Failed to serialize/deserialize " + clazz.getSimpleName(), e); + throw new RuntimeException("Failed to deserialize " + clazz.getSimpleName(), e); } } }