NIFI-1325 migrating AWS processors to use AWSCredentialsProvider controller 
service and role based access and allow default credentials to assume role.

Signed-off-by: Aldrin Piri <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/704c333b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/704c333b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/704c333b

Branch: refs/heads/NIFI-259
Commit: 704c333b0d9067d2ff3d2ba1e53e15ffb4b041e9
Parents: 9e9182a
Author: mans2singh <[email protected]>
Authored: Mon Jan 11 13:02:39 2016 -0800
Committer: Aldrin Piri <[email protected]>
Committed: Tue Jan 19 10:17:25 2016 -0800

----------------------------------------------------------------------
 .../nifi-aws-bundle/nifi-aws-processors/pom.xml |   1 +
 ...AbstractAWSCredentialsProviderProcessor.java | 102 ++++++
 .../processors/aws/AbstractAWSProcessor.java    |  29 +-
 ...AWSCredentialsProviderControllerService.java | 193 ++++++++++++
 .../service/AWSCredentialsProviderService.java  |  44 +++
 .../processors/aws/s3/AbstractS3Processor.java  |  40 ++-
 .../nifi/processors/aws/s3/DeleteS3Object.java  |   2 +-
 .../nifi/processors/aws/s3/FetchS3Object.java   |   2 +-
 .../nifi/processors/aws/s3/PutS3Object.java     |   2 +-
 .../aws/sns/AbstractSNSProcessor.java           |  23 +-
 .../apache/nifi/processors/aws/sns/PutSNS.java  |   5 +-
 .../aws/sqs/AbstractSQSProcessor.java           |  22 +-
 .../nifi/processors/aws/sqs/DeleteSQS.java      |   2 +-
 .../apache/nifi/processors/aws/sqs/GetSQS.java  |   3 +-
 .../apache/nifi/processors/aws/sqs/PutSQS.java  |   2 +-
 ...org.apache.nifi.controller.ControllerService |  15 +
 ...redentialsProviderControllerServiceTest.java | 312 +++++++++++++++++++
 .../processors/aws/s3/ITDeleteS3Object.java     |  58 ++++
 .../nifi/processors/aws/s3/ITFetchS3Object.java |  53 ++++
 .../nifi/processors/aws/s3/ITPutS3Object.java   |  61 ++++
 .../nifi/processors/aws/sns/ITPutSNS.java       |  29 ++
 .../nifi/processors/aws/sqs/TestGetSQS.java     |  29 ++
 .../nifi/processors/aws/sqs/TestPutSQS.java     |  34 ++
 .../resources/mock-aws-credentials.properties   |   2 +
 24 files changed, 1040 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
index 9a70b5e..e3e3c47 100644
--- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/pom.xml
@@ -63,6 +63,7 @@
                 <configuration>
                     <excludes combine.children="append">
                         <exclude>src/test/resources/hello.txt</exclude>
+                        
<exclude>src/test/resources/mock-aws-credentials.properties</exclude>
                     </excludes>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
new file mode 100644
index 0000000..f99349d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.ProcessContext;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
+
+/**
+ * Base class for aws processors that uses AWSCredentialsProvider interface 
for creating aws clients.
+ *
+ * @param <ClientType> client type
+ *
+ * @see <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html";>AWSCredentialsProvider</a>
+ */
+public abstract class AbstractAWSCredentialsProviderProcessor<ClientType 
extends AmazonWebServiceClient>
+    extends AbstractAWSProcessor<ClientType>  {
+
+    /**
+     * AWS credentials provider service
+     *
+     * @see  <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html";>AWSCredentialsProvider</a>
+     */
+    public static final PropertyDescriptor AWS_CREDENTIALS_PROVIDER_SERVICE = 
new PropertyDescriptor.Builder()
+            .name("AWS Credentials Provider service")
+            .description("The Controller Service that is used to obtain aws 
credentials provider")
+            .required(false)
+            .identifiesControllerService(AWSCredentialsProviderService.class)
+            .build();
+
+    /**
+     * This method checks if {#link {@link #AWS_CREDENTIALS_PROVIDER_SERVICE} 
is available and if it
+     * is, uses the credentials provider, otherwise it invokes the {@link 
AbstractAWSProcessor#onScheduled(ProcessContext)}
+     * which uses static AWSCredentials for the aws processors
+     */
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        ControllerService service = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
+        if (service != null) {
+            getLogger().debug("Using aws credentials provider service for 
creating client");
+            onScheduledUsingControllerService(context);
+        } else {
+            getLogger().debug("Using aws credentials for creating client");
+            super.onScheduled(context);
+        }
+    }
+
+    /**
+     * Create aws client using credentials provider
+     * @param context the process context
+     */
+    protected void onScheduledUsingControllerService(ProcessContext context) {
+        final ClientType awsClient = createClient(context, 
getCredentialsProvider(context), createConfiguration(context));
+        this.client = awsClient;
+        super.intializeRegionAndEndpoint(context);
+
+     }
+
+    /**
+     * Get credentials provider using the {@link AWSCredentialsProviderService}
+     * @param context the process context
+     * @return AWSCredentialsProvider the credential provider
+     * @see  <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html";>AWSCredentialsProvider</a>
+     */
+    protected AWSCredentialsProvider getCredentialsProvider(final 
ProcessContext context) {
+
+        final AWSCredentialsProviderService awsCredentialsProviderService =
+              
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(AWSCredentialsProviderService.class);
+
+        return awsCredentialsProviderService.getCredentialsProvider();
+
+    }
+
+    /**
+     * Abstract method to create aws client using credetials provider.  This 
is the preferred method
+     * for creating aws clients
+     * @param context process context
+     * @param credentialsProvider aws credentials provider
+     * @param config aws client configuraiton
+     * @return ClientType the client
+     */
+    protected abstract ClientType createClient(final ProcessContext context, 
final AWSCredentialsProvider credentialsProvider, final ClientConfiguration 
config);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 8d4ef21..46771e6 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -53,6 +53,14 @@ import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory;
 import com.amazonaws.regions.Region;
 import com.amazonaws.regions.Regions;
 
+/**
+ * Abstract base class for aws processors.  This class uses aws credentials 
for creating aws clients
+ *
+ * @deprecated use {@link AbstractAWSCredentialsProviderProcessor} instead 
which uses credentials providers or creating aws clients
+ * @see AbstractAWSCredentialsProviderProcessor
+ *
+ */
+@Deprecated
 public abstract class AbstractAWSProcessor<ClientType extends 
AmazonWebServiceClient> extends AbstractProcessor {
 
     public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
@@ -113,8 +121,8 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
             .addValidator(StandardValidators.URL_VALIDATOR)
             .build();
 
-    private volatile ClientType client;
-    private volatile Region region;
+    protected volatile ClientType client;
+    protected volatile Region region;
 
     // If protocol is changed to be a property, ensure other uses are also 
changed
     protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
@@ -181,7 +189,10 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
     public void onScheduled(final ProcessContext context) {
         final ClientType awsClient = createClient(context, 
getCredentials(context), createConfiguration(context));
         this.client = awsClient;
+        intializeRegionAndEndpoint(context);
+    }
 
+    protected void intializeRegionAndEndpoint(ProcessContext context) {
         // if the processor supports REGION, get the configured region.
         if (getSupportedPropertyDescriptors().contains(REGION)) {
             final String region = context.getProperty(REGION).getValue();
@@ -199,8 +210,19 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
         if (!urlstr.isEmpty()) {
             this.client.setEndpoint(urlstr);
         }
+
     }
 
+    /**
+     * Create client from the arguments
+     * @param context process context
+     * @param credentials static aws credentials
+     * @param config aws client configuration
+     * @return ClientType aws client
+     *
+     * @deprecated use {@link 
AbstractAWSCredentialsProviderProcessor#createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)}
+     */
+    @Deprecated
     protected abstract ClientType createClient(final ProcessContext context, 
final AWSCredentials credentials, final ClientConfiguration config);
 
     protected ClientType getClient() {
@@ -230,10 +252,7 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
         }
 
         return new AnonymousAWSCredentials();
-    }
 
-    protected boolean isEmpty(final String value) {
-        return value == null || value.trim().equals("");
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerService.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerService.java
new file mode 100644
index 0000000..ce7e04b
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerService.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.credentials.provider.service;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+
+import static org.apache.nifi.processors.aws.AbstractAWSProcessor.ACCESS_KEY;
+import static org.apache.nifi.processors.aws.AbstractAWSProcessor.SECRET_KEY;
+import static 
org.apache.nifi.processors.aws.AbstractAWSProcessor.CREDENTIALS_FILE;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.PropertiesFileCredentialsProvider;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
+import com.amazonaws.internal.StaticCredentialsProvider;
+
+/**
+ * Implementation of AWSCredentialsProviderService interface
+ *
+ * @see AWSCredentialsProviderService
+ */
+@CapabilityDescription("Defines credentials for Amazon Web Services 
processors.")
+@Tags({ "aws", "credentials","provider" })
+public class AWSCredentialsProviderControllerService extends 
AbstractControllerService implements AWSCredentialsProviderService {
+
+    /**
+     * AWS Role Arn used for cross account access
+     *
+     * @see <a 
href="http://docs.aws.amazon.com/general/latest/gr/aws-arns-and-namespaces.html#genref-arns";>AWS
 ARN</a>
+     */
+    public static final PropertyDescriptor ASSUME_ROLE_ARN = new 
PropertyDescriptor.Builder().name("Assume Role ARN")
+            
.expressionLanguageSupported(false).required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(false).description("The AWS Role ARN for cross account 
access. This is used in conjunction with role name and session 
timeout").build();
+
+    /**
+     * The role name while creating aws role
+     */
+    public static final PropertyDescriptor ASSUME_ROLE_NAME = new 
PropertyDescriptor.Builder().name("Assume Role Session Name")
+            
.expressionLanguageSupported(false).required(false).addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .sensitive(false).description("The aws role name for cross account 
access. This is used in conjunction with role arn and session time 
out").build();
+
+    /**
+     * Max session time for role based credentials. The range is between 900 
and 3600 seconds.
+     */
+    public static final PropertyDescriptor MAX_SESSION_TIME = new 
PropertyDescriptor.Builder()
+            .name("Session Time")
+            .description("Session time for role based session (between 900 and 
3600 seconds). This is used in conjunction with role arn and name")
+            .defaultValue("3600")
+            .required(false)
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .sensitive(false)
+            .build();
+
+    private static final List<PropertyDescriptor> properties;
+
+    static {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(ACCESS_KEY);
+        props.add(SECRET_KEY);
+        props.add(CREDENTIALS_FILE);
+        props.add(ASSUME_ROLE_ARN);
+        props.add(ASSUME_ROLE_NAME);
+        props.add(MAX_SESSION_TIME);
+
+        properties = Collections.unmodifiableList(props);
+    }
+
+    private volatile AWSCredentialsProvider credentialsProvider;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public AWSCredentialsProvider getCredentialsProvider() throws 
ProcessException {
+        return credentialsProvider;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
+
+        final boolean accessKeySet = 
validationContext.getProperty(ACCESS_KEY).isSet();
+        final boolean secretKeySet = 
validationContext.getProperty(SECRET_KEY).isSet();
+        final boolean assumeRoleArnIsSet = 
validationContext.getProperty(ASSUME_ROLE_ARN).isSet();
+        final boolean assumeRoleNameIsSet = 
validationContext.getProperty(ASSUME_ROLE_NAME).isSet();
+        final Integer maxSessionTime = 
validationContext.getProperty(MAX_SESSION_TIME).asInteger();
+
+        final boolean credentialsFileSet = 
validationContext.getProperty(CREDENTIALS_FILE).isSet();
+
+        final Collection<ValidationResult> validationFailureResults = new 
ArrayList<>();
+
+        // both keys are required if present
+        if ((accessKeySet && !secretKeySet) || (secretKeySet && 
!accessKeySet)) {
+            validationFailureResults.add(new 
ValidationResult.Builder().input("Access Key").valid(false)
+                    .explanation("If setting Secret Key or Access Key, must 
set both").build());
+        }
+
+        // Either keys or creds file is valid
+        if ((secretKeySet || accessKeySet) && credentialsFileSet) {
+            validationFailureResults.add(new 
ValidationResult.Builder().input("Access Key").valid(false)
+                    .explanation("Cannot set both Credentials File and Secret 
Key/Access Key").build());
+        }
+
+        // Both role and arn name are req if present
+        if (assumeRoleArnIsSet ^ assumeRoleNameIsSet ) {
+            validationFailureResults.add(new 
ValidationResult.Builder().input("Assume Role Arn and Name")
+                    .valid(false).explanation("Assume role requires both arn 
and name to be set").build());
+        }
+
+        // Session time only b/w 900 to 3600 sec (see sts session class)
+        if ( maxSessionTime < 900 || maxSessionTime > 3600 )
+            validationFailureResults.add(new 
ValidationResult.Builder().valid(false).input(maxSessionTime + "")
+                    .subject(MAX_SESSION_TIME.getDisplayName() +
+                            " can have value only between 900 and 3600 
seconds").build());
+
+        return validationFailureResults;
+    }
+
+    @OnEnabled
+    public void onConfigured(final ConfigurationContext context) throws 
InitializationException {
+
+        final String accessKey = context.getProperty(ACCESS_KEY).getValue();
+        final String secretKey = context.getProperty(SECRET_KEY).getValue();
+        final String assumeRoleArn = 
context.getProperty(ASSUME_ROLE_ARN).getValue();
+        final Integer maxSessionTime = 
context.getProperty(MAX_SESSION_TIME).asInteger();
+        final String assumeRoleName = 
context.getProperty(ASSUME_ROLE_NAME).getValue();
+        final String credentialsFile = 
context.getProperty(CREDENTIALS_FILE).getValue();
+
+        // Create creds provider from file or keys
+        if (credentialsFile != null) {
+            try {
+                getLogger().debug("Creating properties file credentials 
provider");
+                credentialsProvider = new 
PropertiesFileCredentialsProvider(credentialsFile);
+            } catch (final Exception ioe) {
+                throw new ProcessException("Could not read Credentials File", 
ioe);
+            }
+        }
+
+        if (credentialsProvider == null && accessKey != null && secretKey != 
null) {
+            getLogger().debug("Creating static credentials provider");
+            credentialsProvider = new StaticCredentialsProvider(new 
BasicAWSCredentials(accessKey, secretKey));
+        }
+
+        // If no credentials explicitly provided, then create default one
+        if (credentialsProvider == null) {
+            getLogger().debug("Creating default credentials provider");
+            credentialsProvider = new DefaultAWSCredentialsProviderChain();
+        }
+
+        if (credentialsProvider != null && assumeRoleArn != null && 
assumeRoleName != null) {
+            getLogger().debug("Creating sts assume role session credentials 
provider");
+
+            credentialsProvider = new 
STSAssumeRoleSessionCredentialsProvider.Builder(assumeRoleArn, assumeRoleName)
+                    .withLongLivedCredentialsProvider(credentialsProvider)
+                    .withRoleSessionDurationSeconds(maxSessionTime).build();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "AWSCredentialsProviderService[id=" + getIdentifier() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderService.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderService.java
new file mode 100644
index 0000000..61b646d
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderService.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.credentials.provider.service;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.processor.exception.ProcessException;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+
+/**
+ * AWSCredentialsProviderService interface to support getting 
AWSCredentialsProvider used for instantiating
+ * aws clients
+ *
+ * @see <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html";>AWSCredentialsProvider</a>
+ */
+@Tags({"aws", "security", "credentials", "provider", "session"})
+@CapabilityDescription("Provides AWSCredentialsProvider.")
+public interface AWSCredentialsProviderService extends ControllerService {
+
+    /**
+     * Get credentials provider
+     * @return credentials provider
+     * @throws ProcessException process exception in case there is problem in 
getting credentials provider
+     *
+     * @see  <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html";>AWSCredentialsProvider</a>
+     */
+    public AWSCredentialsProvider getCredentialsProvider() throws 
ProcessException;
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 39ad667..0257112 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -25,10 +25,11 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.regions.Region;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
@@ -39,7 +40,7 @@ import com.amazonaws.services.s3.model.Grantee;
 import com.amazonaws.services.s3.model.Owner;
 import com.amazonaws.services.s3.model.Permission;
 
-public abstract class AbstractS3Processor extends 
AbstractAWSProcessor<AmazonS3Client> {
+public abstract class AbstractS3Processor extends 
AbstractAWSCredentialsProviderProcessor<AmazonS3Client> {
 
     public static final PropertyDescriptor FULL_CONTROL_USER_LIST = new 
PropertyDescriptor.Builder()
             .name("FullControl User List")
@@ -103,22 +104,47 @@ public abstract class AbstractS3Processor extends 
AbstractAWSProcessor<AmazonS3C
             .defaultValue("${filename}")
             .build();
 
+    /**
+     * Create client using credentials provider. This is the preferred way for 
creating clients
+     */
     @Override
-    protected AmazonS3Client createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
-        final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
+    protected AmazonS3Client createClient(final ProcessContext context, final 
AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().info("Creating client with credentials provider");
+
+        final AmazonS3Client s3 = new AmazonS3Client(credentialsProvider, 
config);
+
+        initalizeEndpointOverride(context, s3);
+
+        return s3;
+    }
 
+    private void initalizeEndpointOverride(final ProcessContext context, final 
AmazonS3Client s3) {
         // if ENDPOINT_OVERRIDE is set, use PathStyleAccess
         
if(StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).getValue()).isEmpty()
 == false){
             final S3ClientOptions s3Options = new S3ClientOptions();
             s3Options.setPathStyleAccess(true);
             s3.setS3ClientOptions(s3Options);
         }
+    }
+
+    /**
+     * Create client using AWSCredentials
+     *
+     * @deprecated use {@link #createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AmazonS3Client createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().info("Creating client with awd credentials");
+
+        final AmazonS3Client s3 = new AmazonS3Client(credentials, config);
+
+        initalizeEndpointOverride(context, s3);
 
         return s3;
     }
 
     protected Grantee createGrantee(final String value) {
-        if (isEmpty(value)) {
+        if (StringUtils.isEmpty(value)) {
             return null;
         }
 
@@ -130,7 +156,7 @@ public abstract class AbstractS3Processor extends 
AbstractAWSProcessor<AmazonS3C
     }
 
     protected final List<Grantee> createGrantees(final String value) {
-        if (isEmpty(value)) {
+        if (StringUtils.isEmpty(value)) {
             return Collections.emptyList();
         }
 
@@ -161,7 +187,7 @@ public abstract class AbstractS3Processor extends 
AbstractAWSProcessor<AmazonS3C
         final AccessControlList acl = new AccessControlList();
 
         final String ownerId = 
context.getProperty(OWNER).evaluateAttributeExpressions(flowFile).getValue();
-        if (!isEmpty(ownerId)) {
+        if (!StringUtils.isEmpty(ownerId)) {
             final Owner owner = new Owner();
             owner.setId(ownerId);
             acl.setOwner(owner);

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index ec6eea7..d0b38ba 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -56,7 +56,7 @@ public class DeleteS3Object extends AbstractS3Processor {
             .build();
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, REGION, TIMEOUT, VERSION_ID,
+            Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, TIMEOUT, VERSION_ID,
                     FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, 
READ_ACL_LIST, WRITE_ACL_LIST, OWNER,
                     SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index ae0af56..34e7910 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -73,7 +73,7 @@ public class FetchS3Object extends AbstractS3Processor {
             .build();
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, TIMEOUT, VERSION_ID, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
+            Arrays.asList(BUCKET, KEY, REGION, ACCESS_KEY, SECRET_KEY, 
CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, VERSION_ID, 
SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 6755c13..3c35658 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -90,7 +90,7 @@ public class PutS3Object extends AbstractS3Processor {
         .build();
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-        Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, 
STORAGE_CLASS, REGION, TIMEOUT, EXPIRATION_RULE_ID,
+        Arrays.asList(KEY, BUCKET, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, 
AWS_CREDENTIALS_PROVIDER_SERVICE, STORAGE_CLASS, REGION, TIMEOUT, 
EXPIRATION_RULE_ID,
             FULL_CONTROL_USER_LIST, READ_USER_LIST, WRITE_USER_LIST, 
READ_ACL_LIST, WRITE_ACL_LIST, OWNER, SSL_CONTEXT_SERVICE, ENDPOINT_OVERRIDE));
 
     final static String S3_BUCKET_KEY = "s3.bucket";

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
index 109c92f..cbdcf9c 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/AbstractSNSProcessor.java
@@ -20,13 +20,14 @@ import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.sns.AmazonSNSClient;
 
-public abstract class AbstractSNSProcessor extends 
AbstractAWSProcessor<AmazonSNSClient> {
+public abstract class AbstractSNSProcessor extends 
AbstractAWSCredentialsProviderProcessor<AmazonSNSClient> {
 
     protected static final AllowableValue ARN_TYPE_TOPIC
             = new AllowableValue("Topic ARN", "Topic ARN", "The ARN is the 
name of a topic");
@@ -50,9 +51,25 @@ public abstract class AbstractSNSProcessor extends 
AbstractAWSProcessor<AmazonSN
             .defaultValue(ARN_TYPE_TOPIC.getValue())
             .build();
 
+    /**
+     * Create client using aws credentials provider. This is the preferred way 
for creating clients
+     */
+    @Override
+    protected AmazonSNSClient createClient(final ProcessContext context, final 
AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials provider");
+
+        return new AmazonSNSClient(credentialsProvider, config);
+    }
+
+    /**
+     * Create client using AWSCredentails
+     *
+     * @deprecated use {@link #createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)} instead
+     */
     @Override
     protected AmazonSNSClient createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials");
+
         return new AmazonSNSClient(credentials, config);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
index f58babc..63967e8 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sns/PutSNS.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -74,7 +75,7 @@ public class PutSNS extends AbstractSNSProcessor {
             .build();
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, TIMEOUT,
+            Arrays.asList(ARN, ARN_TYPE, SUBJECT, REGION, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT,
                     USE_JSON_STRUCTURE, CHARACTER_ENCODING));
 
     public static final int MAX_SIZE = 256 * 1024;
@@ -136,7 +137,7 @@ public class PutSNS extends AbstractSNSProcessor {
         }
 
         for (final Map.Entry<PropertyDescriptor, String> entry : 
context.getProperties().entrySet()) {
-            if (entry.getKey().isDynamic() && !isEmpty(entry.getValue())) {
+            if (entry.getKey().isDynamic() && 
!StringUtils.isEmpty(entry.getValue())) {
                 final MessageAttributeValue value = new 
MessageAttributeValue();
                 
value.setStringValue(context.getProperty(entry.getKey()).evaluateAttributeExpressions(flowFile).getValue());
                 value.setDataType("String");

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
index bf8058f..f9bd5c4 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/AbstractSQSProcessor.java
@@ -19,13 +19,14 @@ package org.apache.nifi.processors.aws.sqs;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
 
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.sqs.AmazonSQSClient;
 
-public abstract class AbstractSQSProcessor extends 
AbstractAWSProcessor<AmazonSQSClient> {
+public abstract class AbstractSQSProcessor extends 
AbstractAWSCredentialsProviderProcessor<AmazonSQSClient> {
 
     public static final PropertyDescriptor BATCH_SIZE = new 
PropertyDescriptor.Builder()
             .name("Batch Size")
@@ -43,8 +44,25 @@ public abstract class AbstractSQSProcessor extends 
AbstractAWSProcessor<AmazonSQ
             .required(true)
             .build();
 
+    /**
+     * Create client using credentials provider. This is the preferred way for 
creating clients
+     */
+    @Override
+    protected AmazonSQSClient createClient(final ProcessContext context, final 
AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials provider ");
+
+        return new AmazonSQSClient(credentialsProvider, config);
+    }
+
+    /**
+     * Create client using AWSCredentails
+     *
+     * @deprecated use {@link #createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)} instead
+     */
     @Override
     protected AmazonSQSClient createClient(final ProcessContext context, final 
AWSCredentials credentials, final ClientConfiguration config) {
+        getLogger().info("Creating client using aws credentials ");
+
         return new AmazonSQSClient(credentials, config);
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
index 73e3715..8091b99 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/DeleteSQS.java
@@ -54,7 +54,7 @@ public class DeleteSQS extends AbstractSQSProcessor {
             .build();
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(ACCESS_KEY, SECRET_KEY, REGION, QUEUE_URL, TIMEOUT));
+            Arrays.asList(ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, 
AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, QUEUE_URL, TIMEOUT));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
index 91166a2..2e11f45 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/GetSQS.java
@@ -114,7 +114,8 @@ public class GetSQS extends AbstractSQSProcessor {
             .build();
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE, REGION, BATCH_SIZE, TIMEOUT, CHARSET, 
VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME));
+            Arrays.asList(STATIC_QUEUE_URL, AUTO_DELETE, ACCESS_KEY, 
SECRET_KEY, CREDENTIALS_FILE,
+                AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, BATCH_SIZE, TIMEOUT, 
CHARSET, VISIBILITY_TIMEOUT, RECEIVE_MSG_WAIT_TIME));
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
index 633af19..fd10b21 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/sqs/PutSQS.java
@@ -65,7 +65,7 @@ public class PutSQS extends AbstractSQSProcessor {
             .build();
 
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(
-            Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, 
REGION, DELAY, TIMEOUT));
+            Arrays.asList(QUEUE_URL, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, 
AWS_CREDENTIALS_PROVIDER_SERVICE, REGION, DELAY, TIMEOUT));
 
     private volatile List<PropertyDescriptor> userDefinedProperties = 
Collections.emptyList();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
new file mode 100644
index 0000000..5e2dea4
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java
new file mode 100644
index 0000000..63c3ce9
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/credentials/provider/service/AWSCredentialsProviderControllerServiceTest.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.credentials.provider.service;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+import org.apache.nifi.processors.aws.s3.FetchS3Object;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
+import com.amazonaws.auth.PropertiesFileCredentialsProvider;
+import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
+import com.amazonaws.internal.StaticCredentialsProvider;
+
+public class AWSCredentialsProviderControllerServiceTest {
+
+    @Test
+    public void testDefaultAWSCredentialsProviderChain() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+        final AWSCredentialsProviderService service = 
(AWSCredentialsProviderService) runner.getProcessContext()
+                
.getControllerServiceLookup().getControllerService("awsCredentialsProvider");
+        Assert.assertNotNull(service);
+        final AWSCredentialsProvider credentialsProvider = 
service.getCredentialsProvider();
+        Assert.assertNotNull(credentialsProvider);
+        assertEquals("credentials provider should be equal", 
DefaultAWSCredentialsProviderChain.class,
+                credentialsProvider.getClass());
+    }
+
+    @Test
+    public void testKeysCredentialsProvider() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+        final AWSCredentialsProviderService service = 
(AWSCredentialsProviderService) runner.getProcessContext()
+                
.getControllerServiceLookup().getControllerService("awsCredentialsProvider");
+        Assert.assertNotNull(service);
+        final AWSCredentialsProvider credentialsProvider = 
service.getCredentialsProvider();
+        Assert.assertNotNull(credentialsProvider);
+        assertEquals("credentials provider should be equal", 
StaticCredentialsProvider.class,
+                credentialsProvider.getClass());
+    }
+
+    @Test
+    public void testKeysCredentialsProviderWithRoleAndName() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+        final AWSCredentialsProviderService service = 
(AWSCredentialsProviderService) runner.getProcessContext()
+                
.getControllerServiceLookup().getControllerService("awsCredentialsProvider");
+        Assert.assertNotNull(service);
+        final AWSCredentialsProvider credentialsProvider = 
service.getCredentialsProvider();
+        Assert.assertNotNull(credentialsProvider);
+        assertEquals("credentials provider should be equal", 
STSAssumeRoleSessionCredentialsProvider.class,
+                credentialsProvider.getClass());
+    }
+
+    @Test
+    public void 
testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutInRange() throws 
Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "1000");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+        final AWSCredentialsProviderService service = 
(AWSCredentialsProviderService) runner.getProcessContext()
+                
.getControllerServiceLookup().getControllerService("awsCredentialsProvider");
+        Assert.assertNotNull(service);
+        final AWSCredentialsProvider credentialsProvider = 
service.getCredentialsProvider();
+        Assert.assertNotNull(credentialsProvider);
+        assertEquals("credentials provider should be equal", 
STSAssumeRoleSessionCredentialsProvider.class,
+                credentialsProvider.getClass());
+    }
+
+    @Test
+    public void 
testKeysCredentialsProviderWithRoleAndNameAndSessionTimeout900() throws 
Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "900");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+    }
+
+    @Test
+    public void 
testKeysCredentialsProviderWithRoleAndNameAndSessionTimeout3600() throws 
Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "900");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void 
testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutLessThan900() throws 
Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "899");
+        runner.enableControllerService(serviceImpl);
+        runner.assertNotValid(serviceImpl);
+    }
+
+    @Test(expected = AssertionError.class)
+    public void 
testKeysCredentialsProviderWithRoleAndNameAndSessionTimeoutGreaterThan3600() 
throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.MAX_SESSION_TIME, "899");
+        runner.enableControllerService(serviceImpl);
+    }
+
+    @Test
+    public void testKeysCredentialsProviderWithRoleOnlyInvalid() throws 
Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertNotValid(serviceImpl);
+    }
+
+    @Test
+    public void testKeysCredentialsProviderWithRoleNameOnlyInvalid() throws 
Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertNotValid(serviceImpl);
+    }
+
+    @Test
+    public void testFileCredentialsProviderWithRole() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE,
+                "src/test/resources/mock-aws-credentials.properties");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_ARN, "Role");
+        runner.setProperty(serviceImpl, 
AWSCredentialsProviderControllerService.ASSUME_ROLE_NAME, "RoleName");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+        final AWSCredentialsProviderService service = 
(AWSCredentialsProviderService) runner.getProcessContext()
+                
.getControllerServiceLookup().getControllerService("awsCredentialsProvider");
+        Assert.assertNotNull(service);
+        final AWSCredentialsProvider credentialsProvider = 
service.getCredentialsProvider();
+        Assert.assertNotNull(credentialsProvider);
+        assertEquals("credentials provider should be equal", 
STSAssumeRoleSessionCredentialsProvider.class,
+                credentialsProvider.getClass());
+    }
+
+    @Test
+    public void testFileCredentialsProvider() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE,
+                "src/test/resources/mock-aws-credentials.properties");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+        final AWSCredentialsProviderService service = 
(AWSCredentialsProviderService) runner.getProcessContext()
+                
.getControllerServiceLookup().getControllerService("awsCredentialsProvider");
+        Assert.assertNotNull(service);
+        final AWSCredentialsProvider credentialsProvider = 
service.getCredentialsProvider();
+        Assert.assertNotNull(credentialsProvider);
+        assertEquals("credentials provider should be equal", 
PropertiesFileCredentialsProvider.class,
+                credentialsProvider.getClass());
+    }
+
+    @Test
+    public void testFileCredentialsProviderBadFile() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE,
+                "src/test/resources/bad-mock-aws-credentials.properties");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertNotValid(serviceImpl);
+    }
+
+    @Test
+    public void testFileAndAccessSecretKeyInvalid() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE,
+                "src/test/resources/mock-aws-credentials.properties");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertNotValid(serviceImpl);
+    }
+
+    @Test
+    public void testFileAndAccessKeyInvalid() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE,
+                "src/test/resources/mock-aws-credentials.properties");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertNotValid(serviceImpl);
+    }
+
+    @Test
+    public void testFileAndSecretKeyInvalid() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE,
+                "src/test/resources/mock-aws-credentials.properties");
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertNotValid(serviceImpl);
+    }
+
+    @Test
+    public void testAccessKeyOnlyInvalid() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.ACCESS_KEY, 
"awsAccessKey");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertNotValid(serviceImpl);
+    }
+
+    @Test
+    public void testSecretKeyOnlyInvalid() throws Throwable {
+        final TestRunner runner = 
TestRunners.newTestRunner(FetchS3Object.class);
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.SECRET_KEY, 
"awsSecretKey");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertNotValid(serviceImpl);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java
index 77eb2a3..8ae9224 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITDeleteS3Object.java
@@ -16,14 +16,21 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Provides integration level testing with actual AWS S3 resources for {@link 
DeleteS3Object} and requires additional configuration and resources to work.
  */
@@ -70,6 +77,34 @@ public class ITDeleteS3Object extends AbstractS3IT {
     }
 
     @Test
+    public void testDeleteFolderUsingCredentialsProviderService() throws 
Throwable {
+        // Prepares for this test
+        putTestFile("folder/delete-me", 
getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+        final TestRunner runner = TestRunners.newTestRunner(new 
DeleteS3Object());
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, 
System.getProperty("user.home") + "/aws-credentials.properties");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+
+        runner.setProperty(DeleteS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, 
"awsCredentialsProvider");
+        runner.setProperty(DeleteS3Object.REGION, REGION);
+        runner.setProperty(DeleteS3Object.BUCKET, BUCKET_NAME);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "folder/delete-me");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
+    }
+
+    @Test
     public void testDeleteFolderNoExpressionLanguage() throws IOException {
         // Prepares for this test
         putTestFile("folder/delete-me", 
getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
@@ -107,4 +142,27 @@ public class ITDeleteS3Object extends AbstractS3IT {
         runner.assertAllFlowFilesTransferred(DeleteS3Object.REL_SUCCESS, 1);
     }
 
+    @Test
+    public void testGetPropertyDescriptors() throws Exception {
+        DeleteS3Object processor = new DeleteS3Object();
+        List<PropertyDescriptor> pd = 
processor.getSupportedPropertyDescriptors();
+        assertEquals("size should be eq", 17, pd.size());
+        assertTrue(pd.contains(processor.ACCESS_KEY));
+        assertTrue(pd.contains(processor.AWS_CREDENTIALS_PROVIDER_SERVICE));
+        assertTrue(pd.contains(processor.BUCKET));
+        assertTrue(pd.contains(processor.CREDENTIALS_FILE));
+        assertTrue(pd.contains(processor.ENDPOINT_OVERRIDE));
+        assertTrue(pd.contains(processor.FULL_CONTROL_USER_LIST));
+        assertTrue(pd.contains(processor.KEY));
+        assertTrue(pd.contains(processor.OWNER));
+        assertTrue(pd.contains(processor.READ_ACL_LIST));
+        assertTrue(pd.contains(processor.READ_USER_LIST));
+        assertTrue(pd.contains(processor.REGION));
+        assertTrue(pd.contains(processor.SECRET_KEY));
+        assertTrue(pd.contains(processor.SSL_CONTEXT_SERVICE));
+        assertTrue(pd.contains(processor.TIMEOUT));
+        assertTrue(pd.contains(processor.VERSION_ID));
+        assertTrue(pd.contains(processor.WRITE_ACL_LIST));
+        assertTrue(pd.contains(processor.WRITE_USER_LIST));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
index 3fc5f17..c9334f7 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITFetchS3Object.java
@@ -16,6 +16,9 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -27,6 +30,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Provides integration level testing with actual AWS S3 resources for {@link 
FetchS3Object} and requires additional configuration and resources to work.
  */
@@ -51,6 +57,34 @@ public class ITFetchS3Object extends AbstractS3IT {
     }
 
     @Test
+    public void testFetchS3ObjectUsingCredentialsProviderService() throws 
Throwable {
+        putTestFile("test-file", 
getFileFromResourceName(SAMPLE_FILE_RESOURCE_NAME));
+
+        final TestRunner runner = TestRunners.newTestRunner(new 
FetchS3Object());
+
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, 
System.getProperty("user.home") + "/aws-credentials.properties");
+        runner.enableControllerService(serviceImpl);
+        runner.assertValid(serviceImpl);
+
+        runner.setProperty(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, 
"awsCredentialsProvider");
+        runner.setProperty(FetchS3Object.REGION, REGION);
+        runner.setProperty(FetchS3Object.BUCKET, BUCKET_NAME);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "test-file");
+        runner.enqueue(new byte[0], attrs);
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(FetchS3Object.REL_SUCCESS, 1);
+
+    }
+
+    @Test
     public void testTryToFetchNotExistingFile() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new 
FetchS3Object());
 
@@ -96,4 +130,23 @@ public class ITFetchS3Object extends AbstractS3IT {
             System.out.println(entry.getKey() + " : " + entry.getValue());
         }
     }
+
+
+    @Test
+    public void testGetPropertyDescriptors() throws Exception {
+        FetchS3Object processor = new FetchS3Object();
+        List<PropertyDescriptor> pd = 
processor.getSupportedPropertyDescriptors();
+        assertEquals("size should be eq", 11, pd.size());
+        assertTrue(pd.contains(FetchS3Object.ACCESS_KEY));
+        
assertTrue(pd.contains(FetchS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
+        assertTrue(pd.contains(FetchS3Object.BUCKET));
+        assertTrue(pd.contains(FetchS3Object.CREDENTIALS_FILE));
+        assertTrue(pd.contains(FetchS3Object.ENDPOINT_OVERRIDE));
+        assertTrue(pd.contains(FetchS3Object.KEY));
+        assertTrue(pd.contains(FetchS3Object.REGION));
+        assertTrue(pd.contains(FetchS3Object.SECRET_KEY));
+        assertTrue(pd.contains(FetchS3Object.SSL_CONTEXT_SERVICE));
+        assertTrue(pd.contains(FetchS3Object.TIMEOUT));
+        assertTrue(pd.contains(FetchS3Object.VERSION_ID));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
index 7bc684d..f2e938c 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/ITPutS3Object.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.aws.s3;
 
 import com.amazonaws.services.s3.model.StorageClass;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -29,6 +31,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Provides integration level testing with actual AWS S3 resources for {@link 
PutS3Object} and requires additional configuration and resources to work.
  */
@@ -55,6 +60,36 @@ public class ITPutS3Object extends AbstractS3IT {
     }
 
     @Test
+    public void testPutS3ObjectUsingCredentialsProviderService() throws 
Throwable {
+        final TestRunner runner = TestRunners.newTestRunner(new PutS3Object());
+
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+
+        runner.setProperty(serviceImpl, 
AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, 
System.getProperty("user.home") + "/aws-credentials.properties");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+
+        runner.setProperty(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE, 
"awsCredentialsProvider");
+        runner.setProperty(PutS3Object.REGION, REGION);
+        runner.setProperty(PutS3Object.BUCKET, BUCKET_NAME);
+
+        Assert.assertTrue(runner.setProperty("x-custom-prop", 
"hello").isValid());
+
+        for (int i = 0; i < 3; i++) {
+            final Map<String, String> attrs = new HashMap<>();
+            attrs.put("filename", String.valueOf(i) + ".txt");
+            runner.enqueue(getResourcePath(SAMPLE_FILE_RESOURCE_NAME), attrs);
+        }
+        runner.run(3);
+
+        runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 3);
+
+    }
+
+    @Test
     public void testMetaData() throws IOException {
         PutS3Object processor = new PutS3Object();
         final TestRunner runner = TestRunners.newTestRunner(processor);
@@ -137,4 +172,30 @@ public class ITPutS3Object extends AbstractS3IT {
 
         runner.assertAllFlowFilesTransferred(PutS3Object.REL_SUCCESS, 1);
     }
+
+
+    @Test
+    public void testGetPropertyDescriptors() throws Exception {
+        PutS3Object processor = new PutS3Object();
+        List<PropertyDescriptor> pd = 
processor.getSupportedPropertyDescriptors();
+        assertEquals("size should be eq", 18, pd.size());
+        assertTrue(pd.contains(PutS3Object.ACCESS_KEY));
+        assertTrue(pd.contains(PutS3Object.AWS_CREDENTIALS_PROVIDER_SERVICE));
+        assertTrue(pd.contains(PutS3Object.BUCKET));
+        assertTrue(pd.contains(PutS3Object.CREDENTIALS_FILE));
+        assertTrue(pd.contains(PutS3Object.ENDPOINT_OVERRIDE));
+        assertTrue(pd.contains(PutS3Object.FULL_CONTROL_USER_LIST));
+        assertTrue(pd.contains(PutS3Object.KEY));
+        assertTrue(pd.contains(PutS3Object.OWNER));
+        assertTrue(pd.contains(PutS3Object.READ_ACL_LIST));
+        assertTrue(pd.contains(PutS3Object.READ_USER_LIST));
+        assertTrue(pd.contains(PutS3Object.REGION));
+        assertTrue(pd.contains(PutS3Object.SECRET_KEY));
+        assertTrue(pd.contains(PutS3Object.SSL_CONTEXT_SERVICE));
+        assertTrue(pd.contains(PutS3Object.TIMEOUT));
+        assertTrue(pd.contains(PutS3Object.EXPIRATION_RULE_ID));
+        assertTrue(pd.contains(PutS3Object.STORAGE_CLASS));
+        assertTrue(pd.contains(PutS3Object.WRITE_ACL_LIST));
+        assertTrue(pd.contains(PutS3Object.WRITE_USER_LIST));
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java
index be36ce0..f558446 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sns/ITPutSNS.java
@@ -26,6 +26,8 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertTrue;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
 
 /**
  * Provides integration level testing with actual AWS S3 resources for {@link 
PutSNS} and requires additional configuration and resources to work.
@@ -49,4 +51,31 @@ public class ITPutSNS {
         runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
     }
 
+    @Test
+    public void testPublishWithCredentialsProviderService() throws Throwable {
+        final TestRunner runner = TestRunners.newTestRunner(new PutSNS());
+        String snsArn = "Add Sns arn here";
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(PutSNS.ARN, snsArn);
+        assertTrue(runner.setProperty("DynamicProperty", "hello!").isValid());
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+
+        runner.setProperty(serviceImpl, 
AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, 
System.getProperty("user.home") + "/aws-credentials.properties");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+
+        runner.setProperty(PutSNS.AWS_CREDENTIALS_PROVIDER_SERVICE, 
"awsCredentialsProvider");
+
+        runner.run(1);
+
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "1.txt");
+        runner.enqueue(Paths.get("src/test/resources/hello.txt"), attrs);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutSNS.REL_SUCCESS, 1);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/704c333b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
index e11528e..6377b06 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/sqs/TestGetSQS.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.aws.sqs;
 
 import java.util.List;
 
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
 import org.apache.nifi.processors.aws.sns.PutSNS;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -46,4 +48,31 @@ public class TestGetSQS {
         }
     }
 
+    @Test
+    public void testSimpleGetUsingCredentailsProviderService() throws 
Throwable {
+        final TestRunner runner = TestRunners.newTestRunner(new GetSQS());
+
+        runner.setProperty(GetSQS.TIMEOUT, "30 secs");
+        String queueUrl = "Add queue url here";
+        runner.setProperty(GetSQS.QUEUE_URL, queueUrl);
+
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+
+        runner.setProperty(serviceImpl, AbstractAWSProcessor.CREDENTIALS_FILE, 
System.getProperty("user.home") + "/aws-credentials.properties");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+
+        runner.setProperty(GetSQS.AWS_CREDENTIALS_PROVIDER_SERVICE, 
"awsCredentialsProvider");
+        runner.run(1);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(GetSQS.REL_SUCCESS);
+        for (final MockFlowFile mff : flowFiles) {
+            System.out.println(mff.getAttributes());
+            System.out.println(new String(mff.toByteArray()));
+        }
+    }
+
 }

Reply via email to