NIFI-1325 migrating AWS processors to use AWSCredentialsProvider controller service and role based access and allow default credentials to assume role.
Signed-off-by: Aldrin Piri <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/704c333b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/704c333b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/704c333b Branch: refs/heads/NIFI-259 Commit: 704c333b0d9067d2ff3d2ba1e53e15ffb4b041e9 Parents: 9e9182a Author: mans2singh <[email protected]> Authored: Mon Jan 11 13:02:39 2016 -0800 Committer: Aldrin Piri <[email protected]> Committed: Tue Jan 19 10:17:25 2016 -0800 ---------------------------------------------------------------------- .../nifi-aws-bundle/nifi-aws-processors/pom.xml | 1 + ...AbstractAWSCredentialsProviderProcessor.java | 102 ++++++ .../processors/aws/AbstractAWSProcessor.java | 29 +- ...AWSCredentialsProviderControllerService.java | 193 ++++++++++++ .../service/AWSCredentialsProviderService.java | 44 +++ .../processors/aws/s3/AbstractS3Processor.java | 40 ++- .../nifi/processors/aws/s3/DeleteS3Object.java | 2 +- .../nifi/processors/aws/s3/FetchS3Object.java | 2 +- .../nifi/processors/aws/s3/PutS3Object.java | 2 +- .../aws/sns/AbstractSNSProcessor.java | 23 +- .../apache/nifi/processors/aws/sns/PutSNS.java | 5 +- .../aws/sqs/AbstractSQSProcessor.java | 22 +- .../nifi/processors/aws/sqs/DeleteSQS.java | 2 +- .../apache/nifi/processors/aws/sqs/GetSQS.java | 3 +- .../apache/nifi/processors/aws/sqs/PutSQS.java | 2 +- ...org.apache.nifi.controller.ControllerService | 15 + ...redentialsProviderControllerServiceTest.java | 312 +++++++++++++++++++ .../processors/aws/s3/ITDeleteS3Object.java | 58 ++++ .../nifi/processors/aws/s3/ITFetchS3Object.java | 53 ++++ .../nifi/processors/aws/s3/ITPutS3Object.java | 61 ++++ .../nifi/processors/aws/sns/ITPutSNS.java | 29 ++ .../nifi/processors/aws/sqs/TestGetSQS.java | 29 ++ .../nifi/processors/aws/sqs/TestPutSQS.java | 34 ++ .../resources/mock-aws-credentials.properties | 2 + 24 files changed, 1040 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml index 9a70b5e..e3e3c47 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml @@ -63,6 +63,7 @@ <configuration> <excludes combine.children="append"> <exclude>src/test/resources/hello.txt</exclude> + <exclude>src/test/resources/mock-aws-credentials.properties</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java new file mode 100644 index 0000000..f99349d --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; + +/** + * Base class for aws processors that uses AWSCredentialsProvider interface for creating aws clients. + * + * @param <ClientType> client type + * + * @see <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a> + */ +public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends AmazonWebServiceClient> + extends AbstractAWSProcessor<ClientType> { + + /** + * AWS credentials provider service + * + * @see <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a> + */ + public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = new PropertyDescriptor.Builder() + .name("AWS Credentials Provider service") + .description("The Controller Service that is used to obtain aws credentials provider") + .required(false) + .identifiesControllerService(AWSCredentialsProviderService.class) + .build(); + + /** + * This method checks if {#link {@link #AWS_CREDENTIALS_PROVIDER_SERVICE} is available and if it + * is, uses the credentials provider, otherwise it invokes the {@link AbstractAWSProcessor#onScheduled(ProcessContext)} + * which uses static AWSCredentials for the aws processors + */ + @OnScheduled + public void onScheduled(ProcessContext context) { + ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(); + if (service != null) { + getLogger().debug("Using aws credentials provider service for creating client"); + onScheduledUsingControllerService(context); + } else { + getLogger().debug("Using aws credentials for creating client"); + super.onScheduled(context); + } + } + + /** + * Create aws client using credentials provider + * @param context the process context + */ + protected void onScheduledUsingControllerService(ProcessContext context) { + final ClientType awsClient = createClient(context, getCredentialsProvider(context), createConfiguration(context)); + this.client = awsClient; + super.intializeRegionAndEndpoint(context); + + } + + /** + * Get credentials provider using the {@link AWSCredentialsProviderService} + * @param context the process context + * @return AWSCredentialsProvider the credential provider + * @see <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a> + */ + protected AWSCredentialsProvider getCredentialsProvider(final ProcessContext context) { + + final AWSCredentialsProviderService awsCredentialsProviderService = + context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(AWSCredentialsProviderService.class); + + return awsCredentialsProviderService.getCredentialsProvider(); + + } + + /** + * Abstract method to create aws client using credetials provider. This is the preferred method + * for creating aws clients + * @param context process context + * @param credentialsProvider aws credentials provider + * @param config aws client configuraiton + * @return ClientType the client + */ + protected abstract ClientType createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index 8d4ef21..46771e6 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -53,6 +53,14 @@ import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory; import com.amazonaws.regions.Region; import com.amazonaws.regions.Regions; +/** + * Abstract base class for aws processors. This class uses aws credentials for creating aws clients + * + * @deprecated use {@link AbstractAWSCredentialsProviderProcessor} instead which uses credentials providers or creating aws clients + * @see AbstractAWSCredentialsProviderProcessor + * + */ +@Deprecated public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractProcessor { public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") @@ -113,8 +121,8 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl .addValidator(StandardValidators.URL_VALIDATOR) .build(); - private volatile ClientType client; - private volatile Region region; + protected volatile ClientType client; + protected volatile Region region; // If protocol is changed to be a property, ensure other uses are also changed protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS; @@ -181,7 +189,10 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl public void onScheduled(final ProcessContext context) { final ClientType awsClient = createClient(context, getCredentials(context), createConfiguration(context)); this.client = awsClient; + intializeRegionAndEndpoint(context); + } + protected void intializeRegionAndEndpoint(ProcessContext context) { // if the processor supports REGION, get the configured region. if (getSupportedPropertyDescriptors().contains(REGION)) { final String region = context.getProperty(REGION).getValue(); @@ -199,8 +210,19 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl if (!urlstr.isEmpty()) { this.client.setEndpoint(urlstr); } + } + /** + * Create client from the arguments + * @param context process context + * @param credentials static aws credentials + * @param config aws client configuration + * @return ClientType aws client + * + * @deprecated use {@link AbstractAWSCredentialsProviderProcessor#createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} + */ + @Deprecated protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config); protected ClientType getClient() { @@ -230,10 +252,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl } return new AnonymousAWSCredentials(); - } - protected boolean isEmpty(final String value) { - return value == null || value.trim().equals(""); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerService.java new file mode 100644 index 0000000..ce7e04b --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerService.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.credentials.provider.service; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; + +import static org.apache.nifi.processors.aws.AbstractAWSProcessor.ACCESS_KEY; +import static org.apache.nifi.processors.aws.AbstractAWSProcessor.SECRET_KEY; +import static org.apache.nifi.processors.aws.AbstractAWSProcessor.CREDENTIALS_FILE; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.PropertiesFileCredentialsProvider; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.internal.StaticCredentialsProvider; + +/** + * Implementation of AWSCredentialsProviderService interface + * + * @see AWSCredentialsProviderService + */ +@CapabilityDescription("Defines credentials for Amazon Web Services processors.") +@Tags({ "aws", "credentials","provider" }) +public class AWSCredentialsProviderControllerService extends AbstractControllerService implements AWSCredentialsProviderService { + + /** + * AWS Role Arn used for cross account access + * + * @see <a href="http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns">AWS ARN</a> + */ + public static final PropertyDescriptor ASSUME_ROLE_ARN = new PropertyDescriptor.Builder().name("Assume Role ARN") + .expressionLanguageSupported(false).required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(false).description("The AWS Role ARN for cross account access. This is used in conjunction with role name and session timeout").build(); + + /** + * The role name while creating aws role + */ + public static final PropertyDescriptor ASSUME_ROLE_NAME = new PropertyDescriptor.Builder().name("Assume Role Session Name") + .expressionLanguageSupported(false).required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .sensitive(false).description("The aws role name for cross account access. This is used in conjunction with role arn and session time out").build(); + + /** + * Max session time for role based credentials. The range is between 900 and 3600 seconds. + */ + public static final PropertyDescriptor MAX_SESSION_TIME = new PropertyDescriptor.Builder() + .name("Session Time") + .description("Session time for role based session (between 900 and 3600 seconds). This is used in conjunction with role arn and name") + .defaultValue("3600") + .required(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .sensitive(false) + .build(); + + private static final List<PropertyDescriptor> properties; + + static { + final List<PropertyDescriptor> props = new ArrayList<>(); + props.add(ACCESS_KEY); + props.add(SECRET_KEY); + props.add(CREDENTIALS_FILE); + props.add(ASSUME_ROLE_ARN); + props.add(ASSUME_ROLE_NAME); + props.add(MAX_SESSION_TIME); + + properties = Collections.unmodifiableList(props); + } + + private volatile AWSCredentialsProvider credentialsProvider; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public AWSCredentialsProvider getCredentialsProvider() throws ProcessException { + return credentialsProvider; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + + final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet(); + final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet(); + final boolean assumeRoleArnIsSet = validationContext.getProperty(ASSUME_ROLE_ARN).isSet(); + final boolean assumeRoleNameIsSet = validationContext.getProperty(ASSUME_ROLE_NAME).isSet(); + final Integer maxSessionTime = validationContext.getProperty(MAX_SESSION_TIME).asInteger(); + + final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet(); + + final Collection<ValidationResult> validationFailureResults = new ArrayList<>(); + + // both keys are required if present + if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) { + validationFailureResults.add(new ValidationResult.Builder().input("Access Key").valid(false) + .explanation("If setting Secret Key or Access Key, must set both").build()); + } + + // Either keys or creds file is valid + if ((secretKeySet || accessKeySet) && credentialsFileSet) { + validationFailureResults.add(new ValidationResult.Builder().input("Access Key").valid(false) + .explanation("Cannot set both Credentials File and Secret Key/Access Key").build()); + } + + // Both role and arn name are req if present + if (assumeRoleArnIsSet ^ assumeRoleNameIsSet ) { + validationFailureResults.add(new ValidationResult.Builder().input("Assume Role Arn and Name") + .valid(false).explanation("Assume role requires both arn and name to be set").build()); + } + + // Session time only b/w 900 to 3600 sec (see sts session class) + if ( maxSessionTime < 900 || maxSessionTime > 3600 ) + validationFailureResults.add(new ValidationResult.Builder().valid(false).input(maxSessionTime + "") + .subject(MAX_SESSION_TIME.getDisplayName() + + " can have value only between 900 and 3600 seconds").build()); + + return validationFailureResults; + } + + @OnEnabled + public void onConfigured(final ConfigurationContext context) throws InitializationException { + + final String accessKey = context.getProperty(ACCESS_KEY).getValue(); + final String secretKey = context.getProperty(SECRET_KEY).getValue(); + final String assumeRoleArn = context.getProperty(ASSUME_ROLE_ARN).getValue(); + final Integer maxSessionTime = context.getProperty(MAX_SESSION_TIME).asInteger(); + final String assumeRoleName = context.getProperty(ASSUME_ROLE_NAME).getValue(); + final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue(); + + // Create creds provider from file or keys + if (credentialsFile != null) { + try { + getLogger().debug("Creating properties file credentials provider"); + credentialsProvider = new PropertiesFileCredentialsProvider(credentialsFile); + } catch (final Exception ioe) { + throw new ProcessException("Could not read Credentials File", ioe); + } + } + + if (credentialsProvider == null && accessKey != null && secretKey != null) { + getLogger().debug("Creating static credentials provider"); + credentialsProvider = new StaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)); + } + + // If no credentials explicitly provided, then create default one + if (credentialsProvider == null) { + getLogger().debug("Creating default credentials provider"); + credentialsProvider = new DefaultAWSCredentialsProviderChain(); + } + + if (credentialsProvider != null && assumeRoleArn != null && assumeRoleName != null) { + getLogger().debug("Creating sts assume role session credentials provider"); + + credentialsProvider = new STSAssumeRoleSessionCredentialsProvider.Builder(assumeRoleArn, assumeRoleName) + .withLongLivedCredentialsProvider(credentialsProvider) + .withRoleSessionDurationSeconds(maxSessionTime).build(); + } + } + + @Override + public String toString() { + return "AWSCredentialsProviderService[id=" + getIdentifier() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderService.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderService.java new file mode 100644 index 0000000..61b646d --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderService.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.credentials.provider.service; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.processor.exception.ProcessException; + +import com.amazonaws.auth.AWSCredentialsProvider; + +/** + * AWSCredentialsProviderService interface to support getting AWSCredentialsProvider used for instantiating + * aws clients + * + * @see <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a> + */ +@Tags({"aws", "security", "credentials", "provider", "session"}) +@CapabilityDescription("Provides AWSCredentialsProvider.") +public interface AWSCredentialsProviderService extends ControllerService { + + /** + * Get credentials provider + * @return credentials provider + * @throws ProcessException process exception in case there is problem in getting credentials provider + * + * @see <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a> + */ + public AWSCredentialsProvider getCredentialsProvider() throws ProcessException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java index 39ad667..0257112 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java @@ -25,10 +25,11 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.regions.Region; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.S3ClientOptions; @@ -39,7 +40,7 @@ import com.amazonaws.services.s3.model.Grantee; import com.amazonaws.services.s3.model.Owner; import com.amazonaws.services.s3.model.Permission; -public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3Client> { +public abstract class AbstractS3Processor extends AbstractAWSCredentialsProviderProcessor<AmazonS3Client> { public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new PropertyDescriptor.Builder() .name("FullControl User List") @@ -103,22 +104,47 @@ public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3C .defaultValue("${filename}") .build(); + /** + * Create client using credentials provider. This is the preferred way for creating clients + */ @Override - protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { - final AmazonS3Client s3 = new AmazonS3Client(credentials, config); + protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + getLogger().info("Creating client with credentials provider"); + + final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, config); + + initalizeEndpointOverride(context, s3); + + return s3; + } + private void initalizeEndpointOverride(final ProcessContext context, final AmazonS3Client s3) { // if ENDPOINT_OVERRIDE is set, use PathStyleAccess if(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue()).isEmpty() == false){ final S3ClientOptions s3Options = new S3ClientOptions(); s3Options.setPathStyleAccess(true); s3.setS3ClientOptions(s3Options); } + } + + /** + * Create client using AWSCredentials + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ + @Override + protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + getLogger().info("Creating client with awd credentials"); + + final AmazonS3Client s3 = new AmazonS3Client(credentials, config); + + initalizeEndpointOverride(context, s3); return s3; } protected Grantee createGrantee(final String value) { - if (isEmpty(value)) { + if (StringUtils.isEmpty(value)) { return null; } @@ -130,7 +156,7 @@ public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3C } protected final List<Grantee> createGrantees(final String value) { - if (isEmpty(value)) { + if (StringUtils.isEmpty(value)) { return Collections.emptyList(); } @@ -161,7 +187,7 @@ public abstract class AbstractS3Processor extends AbstractAWSProcessor<AmazonS3C final AccessControlList acl = new AccessControlList(); final String ownerId = context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue(); - if (!isEmpty(ownerId)) { + if (!StringUtils.isEmpty(ownerId)) { final Owner owner = new Owner(); owner.setId(ownerId); acl.setOwner(owner); http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java index ec6eea7..d0b38ba 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java @@ -56,7 +56,7 @@ public class DeleteS3Object extends AbstractS3Processor { .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID, + Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index ae0af56..34e7910 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -73,7 +73,7 @@ public class FetchS3Object extends AbstractS3Processor { .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); + Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java index 6755c13..3c35658 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java @@ -90,7 +90,7 @@ public class PutS3Object extends AbstractS3Processor { .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, + Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID, FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE)); final static String S3_BUCKET_KEY = "s3.bucket"; http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java index 109c92f..cbdcf9c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java @@ -20,13 +20,14 @@ import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.sns.AmazonSNSClient; -public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSNSClient> { +public abstract class AbstractSNSProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonSNSClient> { protected static final AllowableValue ARN_TYPE_TOPIC = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the name of a topic"); @@ -50,9 +51,25 @@ public abstract class AbstractSNSProcessor extends AbstractAWSProcessor<AmazonSN .defaultValue(ARN_TYPE_TOPIC.getValue()) .build(); + /** + * Create client using aws credentials provider. This is the preferred way for creating clients + */ + @Override + protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials provider"); + + return new AmazonSNSClient(credentialsProvider, config); + } + + /** + * Create client using AWSCredentails + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ @Override protected AmazonSNSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials"); + return new AmazonSNSClient(credentials, config); } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java index f58babc..63967e8 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; @@ -74,7 +75,7 @@ public class PutSNS extends AbstractSNSProcessor { .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, TIMEOUT, + Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, USE_JSON_STRUCTURE, CHARACTER_ENCODING)); public static final int MAX_SIZE = 256 * 1024; @@ -136,7 +137,7 @@ public class PutSNS extends AbstractSNSProcessor { } for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { - if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) { + if (entry.getKey().isDynamic() && !StringUtils.isEmpty(entry.getValue())) { final MessageAttributeValue value = new MessageAttributeValue(); value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue()); value.setDataType("String"); http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java index bf8058f..f9bd5c4 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java @@ -19,13 +19,14 @@ package org.apache.nifi.processors.aws.sqs; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.services.sqs.AmazonSQSClient; -public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQSClient> { +public abstract class AbstractSQSProcessor extends AbstractAWSCredentialsProviderProcessor<AmazonSQSClient> { public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() .name("Batch Size") @@ -43,8 +44,25 @@ public abstract class AbstractSQSProcessor extends AbstractAWSProcessor<AmazonSQ .required(true) .build(); + /** + * Create client using credentials provider. This is the preferred way for creating clients + */ + @Override + protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials provider "); + + return new AmazonSQSClient(credentialsProvider, config); + } + + /** + * Create client using AWSCredentails + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ @Override protected AmazonSQSClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials "); + return new AmazonSQSClient(credentials, config); } http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java index 73e3715..8091b99 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java @@ -54,7 +54,7 @@ public class DeleteSQS extends AbstractSQSProcessor { .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT)); + Arrays.asList(ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java index 91166a2..2e11f45 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java @@ -114,7 +114,8 @@ public class GetSQS extends AbstractSQSProcessor { .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME)); + Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, + AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME)); @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java index 633af19..fd10b21 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java @@ -65,7 +65,7 @@ public class PutSQS extends AbstractSQSProcessor { .build(); public static final List<PropertyDescriptor> properties = Collections.unmodifiableList( - Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, REGION, DELAY, TIMEOUT)); + Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, DELAY, TIMEOUT)); private volatile List<PropertyDescriptor> userDefinedProperties = Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000..5e2dea4 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,15 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java new file mode 100644 index 0000000..63c3ce9 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.credentials.provider.service; + +import static org.junit.Assert.assertEquals; + +import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.s3.FetchS3Object; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.PropertiesFileCredentialsProvider; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.internal.StaticCredentialsProvider; + +public class AWSCredentialsProviderControllerServiceTest { + + @Test + public void testDefaultAWSCredentialsProviderChain() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + final AWSCredentialsProviderService service = (AWSCredentialsProviderService) runner.getProcessContext() + .getControllerServiceLookup().getControllerService("awsCredentialsProvider"); + Assert.assertNotNull(service); + final AWSCredentialsProvider credentialsProvider = service.getCredentialsProvider(); + Assert.assertNotNull(credentialsProvider); + assertEquals("credentials provider should be equal", DefaultAWSCredentialsProviderChain.class, + credentialsProvider.getClass()); + } + + @Test + public void testKeysCredentialsProvider() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + final AWSCredentialsProviderService service = (AWSCredentialsProviderService) runner.getProcessContext() + .getControllerServiceLookup().getControllerService("awsCredentialsProvider"); + Assert.assertNotNull(service); + final AWSCredentialsProvider credentialsProvider = service.getCredentialsProvider(); + Assert.assertNotNull(credentialsProvider); + assertEquals("credentials provider should be equal", StaticCredentialsProvider.class, + credentialsProvider.getClass()); + } + + @Test + public void testKeysCredentialsProviderWithRoleAndName() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + final AWSCredentialsProviderService service = (AWSCredentialsProviderService) runner.getProcessContext() + .getControllerServiceLookup().getControllerService("awsCredentialsProvider"); + Assert.assertNotNull(service); + final AWSCredentialsProvider credentialsProvider = service.getCredentialsProvider(); + Assert.assertNotNull(credentialsProvider); + assertEquals("credentials provider should be equal", STSAssumeRoleSessionCredentialsProvider.class, + credentialsProvider.getClass()); + } + + @Test + public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutInRange() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "1000"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + final AWSCredentialsProviderService service = (AWSCredentialsProviderService) runner.getProcessContext() + .getControllerServiceLookup().getControllerService("awsCredentialsProvider"); + Assert.assertNotNull(service); + final AWSCredentialsProvider credentialsProvider = service.getCredentialsProvider(); + Assert.assertNotNull(credentialsProvider); + assertEquals("credentials provider should be equal", STSAssumeRoleSessionCredentialsProvider.class, + credentialsProvider.getClass()); + } + + @Test + public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeout900() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "900"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + } + + @Test + public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeout3600() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "900"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + } + + @Test(expected = AssertionError.class) + public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutLessThan900() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "899"); + runner.enableControllerService(serviceImpl); + runner.assertNotValid(serviceImpl); + } + + @Test(expected = AssertionError.class) + public void testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutGreaterThan3600() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "899"); + runner.enableControllerService(serviceImpl); + } + + @Test + public void testKeysCredentialsProviderWithRoleOnlyInvalid() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); + runner.enableControllerService(serviceImpl); + + runner.assertNotValid(serviceImpl); + } + + @Test + public void testKeysCredentialsProviderWithRoleNameOnlyInvalid() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); + runner.enableControllerService(serviceImpl); + + runner.assertNotValid(serviceImpl); + } + + @Test + public void testFileCredentialsProviderWithRole() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, + "src/test/resources/mock-aws-credentials.properties"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role"); + runner.setProperty(serviceImpl, AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + final AWSCredentialsProviderService service = (AWSCredentialsProviderService) runner.getProcessContext() + .getControllerServiceLookup().getControllerService("awsCredentialsProvider"); + Assert.assertNotNull(service); + final AWSCredentialsProvider credentialsProvider = service.getCredentialsProvider(); + Assert.assertNotNull(credentialsProvider); + assertEquals("credentials provider should be equal", STSAssumeRoleSessionCredentialsProvider.class, + credentialsProvider.getClass()); + } + + @Test + public void testFileCredentialsProvider() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, + "src/test/resources/mock-aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + final AWSCredentialsProviderService service = (AWSCredentialsProviderService) runner.getProcessContext() + .getControllerServiceLookup().getControllerService("awsCredentialsProvider"); + Assert.assertNotNull(service); + final AWSCredentialsProvider credentialsProvider = service.getCredentialsProvider(); + Assert.assertNotNull(credentialsProvider); + assertEquals("credentials provider should be equal", PropertiesFileCredentialsProvider.class, + credentialsProvider.getClass()); + } + + @Test + public void testFileCredentialsProviderBadFile() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, + "src/test/resources/bad-mock-aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + + runner.assertNotValid(serviceImpl); + } + + @Test + public void testFileAndAccessSecretKeyInvalid() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, + "src/test/resources/mock-aws-credentials.properties"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.enableControllerService(serviceImpl); + + runner.assertNotValid(serviceImpl); + } + + @Test + public void testFileAndAccessKeyInvalid() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, + "src/test/resources/mock-aws-credentials.properties"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.enableControllerService(serviceImpl); + + runner.assertNotValid(serviceImpl); + } + + @Test + public void testFileAndSecretKeyInvalid() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, + "src/test/resources/mock-aws-credentials.properties"); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.enableControllerService(serviceImpl); + + runner.assertNotValid(serviceImpl); + } + + @Test + public void testAccessKeyOnlyInvalid() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, "awsAccessKey"); + runner.enableControllerService(serviceImpl); + + runner.assertNotValid(serviceImpl); + } + + @Test + public void testSecretKeyOnlyInvalid() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(FetchS3Object.class); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + runner.addControllerService("awsCredentialsProvider", serviceImpl); + runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, "awsSecretKey"); + runner.enableControllerService(serviceImpl); + + runner.assertNotValid(serviceImpl); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java index 77eb2a3..8ae9224 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java @@ -16,14 +16,21 @@ */ package org.apache.nifi.processors.aws.s3; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Test; import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Provides integration level testing with actual AWS S3 resources for {@link DeleteS3Object} and requires additional configuration and resources to work. */ @@ -70,6 +77,34 @@ public class ITDeleteS3Object extends AbstractS3IT { } @Test + public void testDeleteFolderUsingCredentialsProviderService() throws Throwable { + // Prepares for this test + putTestFile("folder/delete-me", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(new DeleteS3Object()); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + + runner.setProperty(DeleteS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + runner.setProperty(DeleteS3Object.REGION, REGION); + runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "folder/delete-me"); + runner.enqueue(new byte[0], attrs); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1); + } + + @Test public void testDeleteFolderNoExpressionLanguage() throws IOException { // Prepares for this test putTestFile("folder/delete-me", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); @@ -107,4 +142,27 @@ public class ITDeleteS3Object extends AbstractS3IT { runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1); } + @Test + public void testGetPropertyDescriptors() throws Exception { + DeleteS3Object processor = new DeleteS3Object(); + List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); + assertEquals("size should be eq", 17, pd.size()); + assertTrue(pd.contains(processor.ACCESS_KEY)); + assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE)); + assertTrue(pd.contains(processor.BUCKET)); + assertTrue(pd.contains(processor.CREDENTIALS_FILE)); + assertTrue(pd.contains(processor.ENDPOINT_OVERRIDE)); + assertTrue(pd.contains(processor.FULL_CONTROL_USER_LIST)); + assertTrue(pd.contains(processor.KEY)); + assertTrue(pd.contains(processor.OWNER)); + assertTrue(pd.contains(processor.READ_ACL_LIST)); + assertTrue(pd.contains(processor.READ_USER_LIST)); + assertTrue(pd.contains(processor.REGION)); + assertTrue(pd.contains(processor.SECRET_KEY)); + assertTrue(pd.contains(processor.SSL_CONTEXT_SERVICE)); + assertTrue(pd.contains(processor.TIMEOUT)); + assertTrue(pd.contains(processor.VERSION_ID)); + assertTrue(pd.contains(processor.WRITE_ACL_LIST)); + assertTrue(pd.contains(processor.WRITE_USER_LIST)); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java index 3fc5f17..c9334f7 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java @@ -16,6 +16,9 @@ */ package org.apache.nifi.processors.aws.s3; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -27,6 +30,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Provides integration level testing with actual AWS S3 resources for {@link FetchS3Object} and requires additional configuration and resources to work. */ @@ -51,6 +57,34 @@ public class ITFetchS3Object extends AbstractS3IT { } @Test + public void testFetchS3ObjectUsingCredentialsProviderService() throws Throwable { + putTestFile("test-file", getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME)); + + final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); + + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + runner.assertValid(serviceImpl); + + runner.setProperty(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + runner.setProperty(FetchS3Object.REGION, REGION); + runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "test-file"); + runner.enqueue(new byte[0], attrs); + + runner.run(1); + + runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1); + + } + + @Test public void testTryToFetchNotExistingFile() throws IOException { final TestRunner runner = TestRunners.newTestRunner(new FetchS3Object()); @@ -96,4 +130,23 @@ public class ITFetchS3Object extends AbstractS3IT { System.out.println(entry.getKey() + " : " + entry.getValue()); } } + + + @Test + public void testGetPropertyDescriptors() throws Exception { + FetchS3Object processor = new FetchS3Object(); + List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); + assertEquals("size should be eq", 11, pd.size()); + assertTrue(pd.contains(FetchS3Object.ACCESS_KEY)); + assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); + assertTrue(pd.contains(FetchS3Object.BUCKET)); + assertTrue(pd.contains(FetchS3Object.CREDENTIALS_FILE)); + assertTrue(pd.contains(FetchS3Object.ENDPOINT_OVERRIDE)); + assertTrue(pd.contains(FetchS3Object.KEY)); + assertTrue(pd.contains(FetchS3Object.REGION)); + assertTrue(pd.contains(FetchS3Object.SECRET_KEY)); + assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE)); + assertTrue(pd.contains(FetchS3Object.TIMEOUT)); + assertTrue(pd.contains(FetchS3Object.VERSION_ID)); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java index 7bc684d..f2e938c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java @@ -18,6 +18,8 @@ package org.apache.nifi.processors.aws.s3; import com.amazonaws.services.s3.model.StorageClass; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -29,6 +31,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Provides integration level testing with actual AWS S3 resources for {@link PutS3Object} and requires additional configuration and resources to work. */ @@ -55,6 +60,36 @@ public class ITPutS3Object extends AbstractS3IT { } @Test + public void testPutS3ObjectUsingCredentialsProviderService() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(new PutS3Object()); + + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + + runner.setProperty(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + runner.setProperty(PutS3Object.REGION, REGION); + runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME); + + Assert.assertTrue(runner.setProperty("x-custom-prop", "hello").isValid()); + + for (int i = 0; i < 3; i++) { + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", String.valueOf(i) + ".txt"); + runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs); + } + runner.run(3); + + runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3); + + } + + @Test public void testMetaData() throws IOException { PutS3Object processor = new PutS3Object(); final TestRunner runner = TestRunners.newTestRunner(processor); @@ -137,4 +172,30 @@ public class ITPutS3Object extends AbstractS3IT { runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1); } + + + @Test + public void testGetPropertyDescriptors() throws Exception { + PutS3Object processor = new PutS3Object(); + List<PropertyDescriptor> pd = processor.getSupportedPropertyDescriptors(); + assertEquals("size should be eq", 18, pd.size()); + assertTrue(pd.contains(PutS3Object.ACCESS_KEY)); + assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE)); + assertTrue(pd.contains(PutS3Object.BUCKET)); + assertTrue(pd.contains(PutS3Object.CREDENTIALS_FILE)); + assertTrue(pd.contains(PutS3Object.ENDPOINT_OVERRIDE)); + assertTrue(pd.contains(PutS3Object.FULL_CONTROL_USER_LIST)); + assertTrue(pd.contains(PutS3Object.KEY)); + assertTrue(pd.contains(PutS3Object.OWNER)); + assertTrue(pd.contains(PutS3Object.READ_ACL_LIST)); + assertTrue(pd.contains(PutS3Object.READ_USER_LIST)); + assertTrue(pd.contains(PutS3Object.REGION)); + assertTrue(pd.contains(PutS3Object.SECRET_KEY)); + assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE)); + assertTrue(pd.contains(PutS3Object.TIMEOUT)); + assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID)); + assertTrue(pd.contains(PutS3Object.STORAGE_CLASS)); + assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST)); + assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST)); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java index be36ce0..f558446 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java @@ -26,6 +26,8 @@ import java.util.HashMap; import java.util.Map; import static org.junit.Assert.assertTrue; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; /** * Provides integration level testing with actual AWS S3 resources for {@link PutSNS} and requires additional configuration and resources to work. @@ -49,4 +51,31 @@ public class ITPutSNS { runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1); } + @Test + public void testPublishWithCredentialsProviderService() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(new PutSNS()); + String snsArn = "Add Sns arn here"; + runner.setValidateExpressionUsage(false); + runner.setProperty(PutSNS.ARN, snsArn); + assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid()); + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(serviceImpl, AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + + runner.setProperty(PutSNS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + + runner.run(1); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("filename", "1.txt"); + runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1); + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java index e11528e..6377b06 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java @@ -18,6 +18,8 @@ package org.apache.nifi.processors.aws.sqs; import java.util.List; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService; import org.apache.nifi.processors.aws.sns.PutSNS; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -46,4 +48,31 @@ public class TestGetSQS { } } + @Test + public void testSimpleGetUsingCredentailsProviderService() throws Throwable { + final TestRunner runner = TestRunners.newTestRunner(new GetSQS()); + + runner.setProperty(GetSQS.TIMEOUT, "30 secs"); + String queueUrl = "Add queue url here"; + runner.setProperty(GetSQS.QUEUE_URL, queueUrl); + + final AWSCredentialsProviderControllerService serviceImpl = new AWSCredentialsProviderControllerService(); + + runner.addControllerService("awsCredentialsProvider", serviceImpl); + + runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, System.getProperty("user.home") + "/aws-credentials.properties"); + runner.enableControllerService(serviceImpl); + + runner.assertValid(serviceImpl); + + runner.setProperty(GetSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, "awsCredentialsProvider"); + runner.run(1); + + final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS); + for (final MockFlowFile mff : flowFiles) { + System.out.println(mff.getAttributes()); + System.out.println(new String(mff.toByteArray())); + } + } + }
