This is an automated email from the ASF dual-hosted git repository.

chriss pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f07c372  NIFI-9353: Adding Config Verification to AWS Processors
f07c372 is described below

commit f07c37285ebfb6af33fa8a5906bd605890e66908
Author: Joe Gresock <jgres...@gmail.com>
AuthorDate: Sun Oct 31 07:05:08 2021 -0400

    NIFI-9353: Adding Config Verification to AWS Processors
    
    This closes #5504
---
 .../AbstractAWSCredentialsProviderProcessor.java   |  64 +++---
 .../nifi/processors/aws/AbstractAWSProcessor.java  |  54 ++++-
 .../aws/dynamodb/AbstractDynamoDBProcessor.java    |  65 +++---
 .../aws/wag/AbstractAWSGatewayApiProcessor.java    | 109 +++++------
 .../processors/aws/dynamodb/DeleteDynamoDB.java    |   4 +-
 .../nifi/processors/aws/dynamodb/GetDynamoDB.java  | 218 +++++++++++++++------
 .../nifi/processors/aws/dynamodb/PutDynamoDB.java  |   4 +-
 .../nifi/processors/aws/s3/FetchS3Object.java      | 163 ++++++++++-----
 .../org/apache/nifi/processors/aws/s3/ListS3.java  |  11 +-
 .../processors/aws/wag/InvokeAWSGatewayApi.java    | 156 ++++++++++-----
 .../processors/aws/dynamodb/GetDynamoDBTest.java   | 156 ++++++++-------
 .../dynamodb/ITPutGetDeleteGetDynamoDBTest.java    |   7 +
 .../nifi/processors/aws/s3/TestFetchS3Object.java  |  36 +++-
 .../apache/nifi/processors/aws/s3/TestListS3.java  |   4 +-
 .../aws/wag/TestInvokeAmazonGatewayApiMock.java    |  17 +-
 15 files changed, 706 insertions(+), 362 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
index 923878c..8b2c560 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java
@@ -15,16 +15,23 @@
  * limitations under the License.
  */
 package org.apache.nifi.processors.aws;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
+
+import com.amazonaws.AmazonWebServiceClient;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentialsProvider;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
 import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService;
 
-import com.amazonaws.AmazonWebServiceClient;
-import com.amazonaws.ClientConfiguration;
-import com.amazonaws.auth.AWSCredentialsProvider;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Base class for aws processors that uses AWSCredentialsProvider interface 
for creating aws clients.
@@ -34,7 +41,7 @@ import com.amazonaws.auth.AWSCredentialsProvider;
  * @see <a 
href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html";>AWSCredentialsProvider</a>
  */
 public abstract class AbstractAWSCredentialsProviderProcessor<ClientType 
extends AmazonWebServiceClient>
-    extends AbstractAWSProcessor<ClientType>  {
+    extends AbstractAWSProcessor<ClientType> implements VerifiableProcessor {
 
     /**
      * AWS credentials provider service
@@ -49,31 +56,21 @@ public abstract class 
AbstractAWSCredentialsProviderProcessor<ClientType extends
             .build();
 
     /**
-     * This method checks if {#link {@link #AWS_CREDENTIALS_PROVIDER_SERVICE} 
is available and if it
-     * is, uses the credentials provider, otherwise it invokes the {@link 
AbstractAWSProcessor#onScheduled(ProcessContext)}
-     * which uses static AWSCredentials for the aws processors
+     * Attempts to create the client using the controller service first before 
falling back to the standard configuration.
+     * @param context The process context
+     * @return The created client
      */
-    @OnScheduled
-    public void onScheduled(ProcessContext context) {
-        ControllerService service = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
+    protected ClientType createClient(final ProcessContext context) {
+        final ControllerService service = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
         if (service != null) {
             getLogger().debug("Using aws credentials provider service for 
creating client");
-            onScheduledUsingControllerService(context);
+            return createClient(context, getCredentialsProvider(context), 
createConfiguration(context));
         } else {
             getLogger().debug("Using aws credentials for creating client");
-            super.onScheduled(context);
+            return super.createClient(context);
         }
     }
 
-    /**
-     * Create aws client using credentials provider
-     * @param context the process context
-     */
-    protected void onScheduledUsingControllerService(ProcessContext context) {
-        this.client = createClient(context, getCredentialsProvider(context), 
createConfiguration(context));
-        super.initializeRegionAndEndpoint(context, this.client);
-     }
-
     @OnShutdown
     public void onShutDown() {
         if ( this.client != null ) {
@@ -81,6 +78,29 @@ public abstract class 
AbstractAWSCredentialsProviderProcessor<ClientType extends
         }
     }
 
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new ArrayList<>();
+
+        try {
+            getConfiguration(context);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.SUCCESSFUL)
+                    .verificationStepName("Create Client and Configure Region")
+                    .explanation("Successfully created AWS Client and 
configured Region")
+                    .build());
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to create AWS Client", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.FAILED)
+                    .verificationStepName("Create Client and Configure Region")
+                    .explanation("Failed to crete AWS Client or configure 
Region: " + e.getMessage())
+                    .build());
+        }
+
+        return results;
+    }
+
     /**
      * Get credentials provider using the {@link AWSCredentialsProviderService}
      * @param context the process context
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
index 6609beb..9a9e0f2 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java
@@ -276,8 +276,7 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        this.client = createClient(context, getCredentials(context), 
createConfiguration(context));
-        initializeRegionAndEndpoint(context, this.client);
+        setClientAndRegion(context);
     }
 
     /*
@@ -302,10 +301,6 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
      */
     public abstract void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException;
 
-    protected void initializeRegionAndEndpoint(final ProcessContext context, 
final AmazonWebServiceClient client) {
-        this.region = getRegionAndInitializeEndpoint(context, client);
-    }
-
     protected Region getRegionAndInitializeEndpoint(final ProcessContext 
context, final AmazonWebServiceClient client) {
         final Region region;
         // if the processor supports REGION, get the configured region.
@@ -336,7 +331,7 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
                     // falling back to the configured region if the parse fails
                     // e.g. in case of 
https://vpce-***-***.sqs.{region}.vpce.amazonaws.com
                     String regionValue = parseRegionForVPCE(urlstr, 
region.getName());
-                    client.setEndpoint(urlstr, this.client.getServiceName(), 
regionValue);
+                    client.setEndpoint(urlstr, client.getServiceName(), 
regionValue);
                 } else {
                     // handling non-vpce custom endpoints where the AWS 
library can parse the region out
                     // e.g. https://sqs.{region}.***.***.***.gov
@@ -416,4 +411,49 @@ public abstract class AbstractAWSProcessor<ClientType 
extends AmazonWebServiceCl
             getClient().shutdown();
         }
     }
+
+    protected void setClientAndRegion(final ProcessContext context) {
+        final AWSConfiguration awsConfiguration = getConfiguration(context);
+        this.client = awsConfiguration.getClient();
+        this.region = awsConfiguration.getRegion();
+    }
+
+    /**
+     * Creates an AWS service client from the context.
+     * @param context The process context
+     * @return The created client
+     */
+    protected ClientType createClient(final ProcessContext context) {
+        return createClient(context, getCredentials(context), 
createConfiguration(context));
+    }
+
+    /**
+     * Parses and configures the client and region from the context.
+     * @param context The process context
+     * @return The parsed configuration
+     */
+    protected AWSConfiguration getConfiguration(final ProcessContext context) {
+        final ClientType client = createClient(context);
+        final Region region = getRegionAndInitializeEndpoint(context, client);
+
+        return new AWSConfiguration(client, region);
+    }
+
+    public class AWSConfiguration {
+        final ClientType client;
+        final Region region;
+
+        public AWSConfiguration(final ClientType client, final Region region) {
+            this.client = client;
+            this.region = region;
+        }
+
+        public ClientType getClient() {
+            return client;
+        }
+
+        public Region getRegion() {
+            return region;
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
index 6c5d4f6..40388b0 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java
@@ -199,15 +199,15 @@ public abstract class AbstractDynamoDBProcessor extends 
AbstractAWSCredentialsPr
         return client;
     }
 
-    protected Object getValue(ProcessContext context, PropertyDescriptor type, 
PropertyDescriptor value, FlowFile flowFile) {
+    protected Object getValue(final ProcessContext context, final 
PropertyDescriptor type, final PropertyDescriptor value, final Map<String, 
String> attributes) {
         if ( 
context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) 
{
-            return 
context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue();
+            return 
context.getProperty(value).evaluateAttributeExpressions(attributes).getValue();
         } else {
-            return new 
BigDecimal(context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue());
+            return new 
BigDecimal(context.getProperty(value).evaluateAttributeExpressions(attributes).getValue());
         }
     }
 
-    protected Object getAttributeValue(ProcessContext context, 
PropertyDescriptor propertyType, AttributeValue value) {
+    protected Object getAttributeValue(final ProcessContext context, final 
PropertyDescriptor propertyType, final AttributeValue value) {
         if ( 
context.getProperty(propertyType).getValue().equals(ALLOWABLE_VALUE_STRING.getValue()))
 {
             if ( value == null ) return null;
             else return value.getS();
@@ -217,9 +217,14 @@ public abstract class AbstractDynamoDBProcessor extends 
AbstractAWSCredentialsPr
         }
     }
 
+    protected DynamoDB getDynamoDB(final AmazonDynamoDBClient client) {
+        return new DynamoDB(client);
+    }
+
     protected synchronized DynamoDB getDynamoDB() {
-        if ( dynamoDB == null )
-            dynamoDB = new DynamoDB(client);
+        if (dynamoDB == null) {
+            dynamoDB = getDynamoDB(client);
+        }
         return dynamoDB;
     }
 
@@ -297,28 +302,33 @@ public abstract class AbstractDynamoDBProcessor extends 
AbstractAWSCredentialsPr
         keysToFlowFileMap.remove(itemKeys);
     }
 
-    protected boolean isRangeKeyValueConsistent(String rangeKeyName, Object 
rangeKeyValue, ProcessSession session,
-            FlowFile flowFile) {
+    protected boolean isRangeKeyValueConsistent(final String rangeKeyName, 
final Object rangeKeyValue, final ProcessSession session, FlowFile flowFile) {
+        try {
+            validateRangeKeyValue(rangeKeyName, rangeKeyValue);
+        } catch (final IllegalArgumentException e) {
+            getLogger().error(e.getMessage() + ": " + flowFile, e);
+            flowFile = session.putAttribute(flowFile, 
DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName
+                 + "'/value '" + rangeKeyValue + "' inconsistency error");
+            session.transfer(flowFile, REL_FAILURE);
+            return false;
+        }
+
+        return true;
+    }
+
+    protected void validateRangeKeyValue(final String rangeKeyName, final 
Object rangeKeyValue) {
         boolean isRangeNameBlank = StringUtils.isBlank(rangeKeyName);
         boolean isRangeValueNull = rangeKeyValue == null;
         boolean isConsistent = true;
-        if ( ! isRangeNameBlank && (isRangeValueNull || 
StringUtils.isBlank(rangeKeyValue.toString()))) {
+        if (!isRangeNameBlank && (isRangeValueNull || 
StringUtils.isBlank(rangeKeyValue.toString()))) {
             isConsistent = false;
         }
-        if ( isRangeNameBlank &&  ( ! isRangeValueNull && ! 
StringUtils.isBlank(rangeKeyValue.toString()))) {
+        if (isRangeNameBlank &&  (!isRangeValueNull && 
!StringUtils.isBlank(rangeKeyValue.toString()))) {
             isConsistent = false;
         }
-
-        if ( ! isConsistent ) {
-            getLogger().error("Range key name '" + rangeKeyName + "' was not 
consistent with range value "
-                + rangeKeyValue + "'" + flowFile);
-            flowFile = session.putAttribute(flowFile, 
DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName
-                 + "'/value '" + rangeKeyValue + "' inconsistency error");
-            session.transfer(flowFile, REL_FAILURE);
+        if (!isConsistent) {
+            throw new IllegalArgumentException(String.format("Range key name 
'%s' was not consistent with range value '%s'", rangeKeyName, rangeKeyValue));
         }
-
-        return isConsistent;
-
     }
 
     protected boolean isHashKeyValueConsistent(String hashKeyName, Object 
hashKeyValue, ProcessSession session,
@@ -326,10 +336,11 @@ public abstract class AbstractDynamoDBProcessor extends 
AbstractAWSCredentialsPr
 
         boolean isConsistent = true;
 
-        if ( hashKeyValue == null || 
StringUtils.isBlank(hashKeyValue.toString())) {
-            getLogger().error("Hash key value '" + hashKeyValue + "' is 
required for flow file " + flowFile);
-                 flowFile = session.putAttribute(flowFile, 
DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName
-                     + "/value '" + hashKeyValue + "' inconsistency error");
+        try {
+            validateHashKeyValue(hashKeyValue);
+        } catch (final IllegalArgumentException e) {
+            getLogger().error(e.getMessage() + ": " + flowFile, e);
+            flowFile = session.putAttribute(flowFile, 
DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName + "/value '" + 
hashKeyValue + "' inconsistency error");
             session.transfer(flowFile, REL_FAILURE);
             isConsistent = false;
         }
@@ -338,6 +349,12 @@ public abstract class AbstractDynamoDBProcessor extends 
AbstractAWSCredentialsPr
 
     }
 
+    protected void validateHashKeyValue(final Object hashKeyValue) {
+        if (hashKeyValue == null || 
StringUtils.isBlank(hashKeyValue.toString())) {
+            throw new IllegalArgumentException(String.format("Hash key value 
is required.  Provided value was '%s'", hashKeyValue));
+        }
+    }
+
     @OnStopped
     public void onStopped() {
         this.dynamoDB = null;
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java
index 0c310db..7cde727 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java
@@ -97,7 +97,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends
     public AbstractAWSGatewayApiProcessor() {
     }
 
-    public AbstractAWSGatewayApiProcessor(AmazonHttpClient client) {
+    public AbstractAWSGatewayApiProcessor(final AmazonHttpClient client) {
         providedClient = client;
     }
 
@@ -280,8 +280,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends
             .build();
 
     @Override
-    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
-        String propertyDescriptorName) {
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final 
String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
                 .required(false)
                 .name(propertyDescriptorName)
@@ -318,14 +317,13 @@ public abstract class AbstractAWSGatewayApiProcessor 
extends
     }
 
     @Override
-    protected Collection<ValidationResult> customValidate(
-        final ValidationContext validationContext) {
+    protected Collection<ValidationResult> customValidate(final 
ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>(3);
         results.addAll(super.customValidate(validationContext));
         final boolean querySet = 
validationContext.getProperty(PROP_QUERY_PARAMS).isSet();
 
         if (querySet) {
-            String input = 
validationContext.getProperty(PROP_QUERY_PARAMS).getValue();
+            final String input = 
validationContext.getProperty(PROP_QUERY_PARAMS).getValue();
             // if we have expressions, we don't do further validation
             if 
(!(validationContext.isExpressionLanguageSupported(PROP_QUERY_PARAMS.getName())
                 && validationContext.isExpressionLanguagePresent(input))) {
@@ -350,15 +348,15 @@ public abstract class AbstractAWSGatewayApiProcessor 
extends
                 }
             }
         }
-        String method = 
trimToEmpty(validationContext.getProperty(PROP_METHOD).getValue())
+        final String method = 
trimToEmpty(validationContext.getProperty(PROP_METHOD).getValue())
             .toUpperCase();
 
         // if there are expressions do not validate
         if 
(!(validationContext.isExpressionLanguageSupported(PROP_METHOD.getName())
                 && validationContext.isExpressionLanguagePresent(method))) {
             try {
-                HttpMethodName methodName = HttpMethodName.fromValue(method);
-            } catch (IllegalArgumentException e) {
+                HttpMethodName.fromValue(method);
+            } catch (final IllegalArgumentException e) {
                 results.add(new 
ValidationResult.Builder().subject(PROP_METHOD.getName()).input(method)
                         .explanation("Unsupported METHOD")
                         .valid(false).build());
@@ -369,9 +367,9 @@ public abstract class AbstractAWSGatewayApiProcessor extends
     }
 
     @Override
-    protected GenericApiGatewayClient createClient(ProcessContext context,
-                                                   AWSCredentialsProvider 
awsCredentialsProvider,
-                                                   ClientConfiguration 
clientConfiguration) {
+    protected GenericApiGatewayClient createClient(final ProcessContext 
context,
+                                                   final 
AWSCredentialsProvider awsCredentialsProvider,
+                                                   final ClientConfiguration 
clientConfiguration) {
 
         GenericApiGatewayClientBuilder builder = new 
GenericApiGatewayClientBuilder()
             
.withCredentials(awsCredentialsProvider).withClientConfiguration(clientConfiguration)
@@ -392,33 +390,34 @@ public abstract class AbstractAWSGatewayApiProcessor 
extends
     protected GenericApiGatewayClient createClient(final ProcessContext 
context,
                                                    final AWSCredentials 
credentials,
                                                    final ClientConfiguration 
clientConfiguration) {
-        return createClient(context, new 
AWSStaticCredentialsProvider(credentials),
-                            clientConfiguration);
+        return createClient(context, new 
AWSStaticCredentialsProvider(credentials), clientConfiguration);
     }
 
     protected GenericApiGatewayRequest configureRequest(final ProcessContext 
context,
                                                         final ProcessSession 
session,
                                                         final String 
resourcePath,
-                                                        final FlowFile 
requestFlowFile) {
-        String method = trimToEmpty(
+                                                        final FlowFile 
requestFlowFile,
+                                                        final Map<String, 
String> attributes) {
+        final String method = trimToEmpty(
                 
context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile)
                         .getValue()).toUpperCase();
-        HttpMethodName methodName = HttpMethodName.fromValue(method);
-        return configureRequest(context, session, 
resourcePath,requestFlowFile, methodName);
+        final HttpMethodName methodName = HttpMethodName.fromValue(method);
+        return configureRequest(context, session, 
resourcePath,requestFlowFile, methodName, attributes);
     }
 
     protected GenericApiGatewayRequest configureRequest(final ProcessContext 
context,
                                                         final ProcessSession 
session,
                                                         final String 
resourcePath,
                                                         final FlowFile 
requestFlowFile,
-                                                        final HttpMethodName 
methodName) {
+                                                        final HttpMethodName 
methodName,
+                                                        final Map<String, 
String> attributes) {
 
         GenericApiGatewayRequestBuilder builder = new 
GenericApiGatewayRequestBuilder()
             .withResourcePath(resourcePath);
         final Map<String, List<String>> parameters = getParameters(context);
         builder = builder.withParameters(parameters);
 
-        InputStream requestBody = null;
+        InputStream requestBody;
         switch (methodName) {
             case GET:
                 builder = builder.withHttpMethod(HttpMethodName.GET);
@@ -447,7 +446,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends
                 break;
         }
 
-        builder = setHeaderProperties(context, builder, methodName, 
requestFlowFile);
+        builder = setHeaderProperties(context, builder, methodName, 
attributes);
         return builder.build();
     }
 
@@ -456,7 +455,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends
                                                final FlowFile requestFlowFile) 
{
 
         if (context.getProperty(PROP_SEND_BODY).asBoolean() && requestFlowFile 
!= null) {
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+            final ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream();
             session.exportTo(requestFlowFile, outputStream);
             return new ByteArrayInputStream(outputStream.toByteArray());
 
@@ -467,24 +466,22 @@ public abstract class AbstractAWSGatewayApiProcessor 
extends
 
     protected GenericApiGatewayRequestBuilder setHeaderProperties(final 
ProcessContext context,
                                                                   
GenericApiGatewayRequestBuilder requestBuilder,
-                                                                  
HttpMethodName methodName,
-                                                                  final 
FlowFile requestFlowFile) {
+                                                                  final 
HttpMethodName methodName,
+                                                                  final 
Map<String, String> requestAttributes) {
 
-        Map<String, String> headers = new HashMap<>();
-        for (String headerKey : dynamicPropertyNames) {
-            String headerValue = context.getProperty(headerKey)
-                                        
.evaluateAttributeExpressions(requestFlowFile).getValue();
+        final Map<String, String> headers = new HashMap<>();
+        for (final String headerKey : dynamicPropertyNames) {
+            final String headerValue = 
context.getProperty(headerKey).evaluateAttributeExpressions(requestAttributes).getValue();
             headers.put(headerKey, headerValue);
         }
 
         // iterate through the flowfile attributes, adding any attribute that
         // matches the attributes-to-send pattern. if the pattern is not set
         // (it's an optional property), ignore that attribute entirely
-        if (regexAttributesToSend != null && requestFlowFile != null) {
-            Map<String, String> attributes = requestFlowFile.getAttributes();
-            Matcher m = regexAttributesToSend.matcher("");
-            for (Map.Entry<String, String> entry : attributes.entrySet()) {
-                String headerKey = trimToEmpty(entry.getKey());
+        if (regexAttributesToSend != null) {
+            final Matcher m = regexAttributesToSend.matcher("");
+            for (final Map.Entry<String, String> entry : 
requestAttributes.entrySet()) {
+                final String headerKey = trimToEmpty(entry.getKey());
 
                 // don't include any of the ignored attributes
                 if (IGNORED_ATTRIBUTES.contains(headerKey)) {
@@ -502,8 +499,8 @@ public abstract class AbstractAWSGatewayApiProcessor extends
         }
 
         String contentType = context.getProperty(PROP_CONTENT_TYPE)
-                                    
.evaluateAttributeExpressions(requestFlowFile).getValue();
-        boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean();
+                                    
.evaluateAttributeExpressions(requestAttributes).getValue();
+        final boolean sendBody = 
context.getProperty(PROP_SEND_BODY).asBoolean();
         contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE 
: contentType;
         if (methodName == HttpMethodName.PUT || methodName == 
HttpMethodName.POST
             || methodName == HttpMethodName.PATCH) {
@@ -527,23 +524,23 @@ public abstract class AbstractAWSGatewayApiProcessor 
extends
      * @param context ProcessContext
      * @return Map of names and values
      */
-    protected Map<String, List<String>> getParameters(ProcessContext context) {
+    protected Map<String, List<String>> getParameters(final ProcessContext 
context) {
 
         if (!context.getProperty(PROP_QUERY_PARAMS).isSet()) {
             return new HashMap<>();
         }
         final String queryString = context.getProperty(PROP_QUERY_PARAMS)
                                           
.evaluateAttributeExpressions().getValue();
-        List<NameValuePair> params = URLEncodedUtils
+        final List<NameValuePair> params = URLEncodedUtils
             .parse(queryString, Charsets.toCharset("UTF-8"));
 
         if (params.isEmpty()) {
             return new HashMap<>();
         }
 
-        Map<String, List<String>> map = new HashMap<>();
+        final Map<String, List<String>> map = new HashMap<>();
 
-        for (NameValuePair nvp : params) {
+        for (final NameValuePair nvp : params) {
             if (!map.containsKey(nvp.getName())) {
                 map.put(nvp.getName(), new ArrayList<>());
             }
@@ -555,19 +552,17 @@ public abstract class AbstractAWSGatewayApiProcessor 
extends
     /**
      * Returns a Map of flowfile attributes from the response http headers. 
Multivalue headers are naively converted to comma separated strings.
      */
-    protected Map<String, String> convertAttributesFromHeaders(
-        GenericApiGatewayResponse responseHttp) {
+    protected Map<String, String> convertAttributesFromHeaders(final 
GenericApiGatewayResponse responseHttp) {
         // create a new hashmap to store the values from the connection
-        Map<String, String> map = new HashMap<>();
+        final  Map<String, String> map = new HashMap<>();
         responseHttp.getHttpResponse().getHeaders().entrySet().forEach((entry) 
-> {
 
-            String key = entry.getKey();
-            String value = entry.getValue();
+            final String key = entry.getKey();
+            final String value = entry.getValue();
 
             if (key == null) {
                 return;
             }
-
             // we ignore any headers with no actual values (rare)
             if (StringUtils.isBlank(value)) {
                 return;
@@ -590,8 +585,8 @@ public abstract class AbstractAWSGatewayApiProcessor extends
         throw new IllegalStateException("Unknown relationship " + name);
     }
 
-    protected void route(FlowFile request, FlowFile response, ProcessSession 
session,
-                         ProcessContext context, int statusCode, 
Set<Relationship> relationships) {
+    protected void route(FlowFile request, final FlowFile response, final 
ProcessSession session,
+                         final ProcessContext context, final int statusCode, 
final Set<Relationship> relationships) {
         // check if we should yield the processor
         if (!isSuccess(statusCode) && request == null) {
             context.yield();
@@ -609,12 +604,10 @@ public abstract class AbstractAWSGatewayApiProcessor 
extends
         if (isSuccess(statusCode)) {
             // we have two flowfiles to transfer
             if (request != null) {
-                session
-                    .transfer(request, 
getRelationshipForName(REL_SUCCESS_REQ_NAME, relationships));
+                session.transfer(request, 
getRelationshipForName(REL_SUCCESS_REQ_NAME, relationships));
             }
             if (response != null && !responseSent) {
-                session
-                    .transfer(response, 
getRelationshipForName(REL_RESPONSE_NAME, relationships));
+                session.transfer(response, 
getRelationshipForName(REL_RESPONSE_NAME, relationships));
             }
 
             // 5xx -> RETRY
@@ -636,20 +629,20 @@ public abstract class AbstractAWSGatewayApiProcessor 
extends
 
     }
 
-    protected boolean isSuccess(int statusCode) {
+    protected boolean isSuccess(final int statusCode) {
         return statusCode / 100 == 2;
     }
 
-    protected void logRequest(ComponentLog logger, URI endpoint, 
GenericApiGatewayRequest request) {
+    protected void logRequest(final ComponentLog logger, final URI endpoint, 
final GenericApiGatewayRequest request) {
         try {
             logger.debug("\nRequest to remote service:\n\t{}\t{}\t\n{}",
                 new Object[]{endpoint.toURL().toExternalForm(), 
request.getHttpMethod(), getLogString(request.getHeaders())});
-        } catch (MalformedURLException e) {
+        } catch (final MalformedURLException e) {
             logger.debug(e.getMessage());
         }
     }
 
-    protected void logResponse(ComponentLog logger, GenericApiGatewayResponse 
response) {
+    protected void logResponse(final ComponentLog logger, final 
GenericApiGatewayResponse response) {
         try {
             logger.debug("\nResponse from remote service:\n\t{}\n{}",
                     new 
Object[]{response.getHttpResponse().getHttpRequest().getURI().toURL().toExternalForm(),
 getLogString(response.getHttpResponse().getHeaders())});
@@ -658,9 +651,9 @@ public abstract class AbstractAWSGatewayApiProcessor extends
         }
     }
 
-    protected String getLogString(Map<String, String> map) {
-        StringBuilder sb = new StringBuilder();
-        if(map != null && map.size() > 0) {
+    protected String getLogString(final Map<String, String> map) {
+        final StringBuilder sb = new StringBuilder();
+        if (map != null && map.size() > 0) {
             for (Map.Entry<String, String> entry : map.entrySet()) {
                 String value = entry.getValue();
                 sb.append("\t");
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
index 3c9f73b..34f712d 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java
@@ -101,8 +101,8 @@ public class DeleteDynamoDB extends 
AbstractWriteDynamoDBProcessor {
         TableWriteItems tableWriteItems = new TableWriteItems(table);
 
         for (FlowFile flowFile : flowFiles) {
-            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, flowFile);
-            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, flowFile.getAttributes());
+            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
 
             if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, 
session, flowFile)) {
                 continue;
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
index 328dae6..14626ca 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java
@@ -16,40 +16,44 @@
  */
 package org.apache.nifi.processors.aws.dynamodb;
 
-import java.io.ByteArrayInputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
+import com.amazonaws.services.dynamodbv2.document.DynamoDB;
+import com.amazonaws.services.dynamodbv2.document.Item;
+import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
 import org.apache.nifi.annotation.behavior.ReadsAttributes;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
-import com.amazonaws.services.dynamodbv2.document.DynamoDB;
-import com.amazonaws.services.dynamodbv2.document.Item;
-import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
-import com.amazonaws.services.dynamodbv2.model.AttributeValue;
-import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 @SupportsBatching
 @SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class})
@@ -100,42 +104,97 @@ public class GetDynamoDB extends 
AbstractDynamoDBProcessor {
     }
 
     @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
-        List<FlowFile> flowFiles = 
session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
-        if (flowFiles == null || flowFiles.size() == 0) {
-            return;
+    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new 
ArrayList<>(super.verify(context, verificationLogger, attributes));
+
+        final String table = 
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+        final String jsonDocument = 
context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
+
+        TableKeysAndAttributes tableKeysAndAttributes;
+
+        try {
+            tableKeysAndAttributes = getTableKeysAndAttributes(context, 
attributes);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.SUCCESSFUL)
+                    .verificationStepName("Configure DynamoDB BatchGetItems 
Request")
+                    .explanation(String.format("Successfully configured 
BatchGetItems Request"))
+                    .build());
+        } catch (final IllegalArgumentException e) {
+            verificationLogger.error("Failed to configured BatchGetItems 
Request", e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.FAILED)
+                    .verificationStepName("Configure DynamoDB BatchGetItems 
Request")
+                    .explanation(String.format("Failed to configured 
BatchGetItems Request: " + e.getMessage()))
+                    .build());
+            return results;
         }
 
-        Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
+        if (tableKeysAndAttributes.getPrimaryKeys() == null || 
tableKeysAndAttributes.getPrimaryKeys().isEmpty()) {
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.SKIPPED)
+                    .verificationStepName("Get DynamoDB Items")
+                    .explanation(String.format("Skipped getting DynamoDB items 
because no primary keys would be included in retrieval"))
+                    .build());
+        } else {
+            try {
+                final DynamoDB dynamoDB = 
getDynamoDB(getConfiguration(context).getClient());
+                int totalCount = 0;
+                int jsonDocumentCount = 0;
 
-        final String table = 
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
-        TableKeysAndAttributes tableKeysAndAttributes = new 
TableKeysAndAttributes(table);
+                BatchGetItemOutcome result = 
dynamoDB.batchGetItem(tableKeysAndAttributes);
 
-        final String hashKeyName = 
context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
-        final String rangeKeyName = 
context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
-        final String jsonDocument = 
context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
+                // Handle processed items and get the json document
+                final List<Item> items = result.getTableItems().get(table);
+                for (final Item item : items) {
+                    totalCount++;
+                    if (item.get(jsonDocument) != null) {
+                        jsonDocumentCount++;
+                    }
+                }
 
-        for (FlowFile flowFile : flowFiles) {
-            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, flowFile);
-            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+                results.add(new ConfigVerificationResult.Builder()
+                        .outcome(Outcome.SUCCESSFUL)
+                        .verificationStepName("Get DynamoDB Items")
+                        .explanation(String.format("Successfully retrieved %s 
items, including %s JSON documents, from DynamoDB", totalCount, 
jsonDocumentCount))
+                        .build());
 
-            if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, 
session, flowFile)) {
-                continue;
-            }
+            } catch (final Exception e) {
+                verificationLogger.error("Failed to retrieve items from 
DynamoDB", e);
 
-            if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, 
session, flowFile) ) {
-                continue;
+                results.add(new ConfigVerificationResult.Builder()
+                        .outcome(Outcome.FAILED)
+                        .verificationStepName("Get DynamoDB Items")
+                        .explanation(String.format("Failed to retrieve items 
from DynamoDB: %s", e.getMessage()))
+                        .build());
             }
+        }
 
-            keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), 
flowFile);
+        return results;
+    }
 
-            if ( rangeKeyValue == null || 
StringUtils.isBlank(rangeKeyValue.toString()) ) {
-                tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName, 
hashKeyValue);
-            } else {
-                tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName, 
hashKeyValue, rangeKeyName, rangeKeyValue);
-            }
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+        final List<FlowFile> flowFiles = 
session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger());
+        if (flowFiles == null || flowFiles.size() == 0) {
+            return;
+        }
+
+        final Map<ItemKeys,FlowFile> keysToFlowFileMap = 
getKeysToFlowFileMap(context, session, flowFiles);
+
+        final TableKeysAndAttributes tableKeysAndAttributes;
+        try {
+            tableKeysAndAttributes = getTableKeysAndAttributes(context, 
flowFiles.stream()
+                    
.map(FlowFile::getAttributes).collect(Collectors.toList()).toArray(new Map[0]));
+        } catch (final IllegalArgumentException e) {
+            getLogger().error(e.getMessage(), e);
+            return;
         }
 
+        final String table = 
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+        final String hashKeyName = 
context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
+        final String rangeKeyName = 
context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
+        final String jsonDocument = 
context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue();
+
         if (keysToFlowFileMap.isEmpty()) {
             return;
         }
@@ -146,12 +205,12 @@ public class GetDynamoDB extends 
AbstractDynamoDBProcessor {
             BatchGetItemOutcome result = 
dynamoDB.batchGetItem(tableKeysAndAttributes);
 
             // Handle processed items and get the json document
-            List<Item> items = result.getTableItems().get(table);
-            for (Item item : items) {
-                ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), 
item.get(rangeKeyName));
+            final List<Item> items = result.getTableItems().get(table);
+            for (final Item item : items) {
+                final ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), 
item.get(rangeKeyName));
                 FlowFile flowFile = keysToFlowFileMap.get(itemKeys);
 
-                if ( item.get(jsonDocument) != null ) {
+                if (item.get(jsonDocument) != null) {
                     ByteArrayInputStream bais = new 
ByteArrayInputStream(item.getJSON(jsonDocument).getBytes());
                     flowFile = session.importFrom(bais, flowFile);
                 }
@@ -161,38 +220,85 @@ public class GetDynamoDB extends 
AbstractDynamoDBProcessor {
             }
 
             // Handle unprocessed keys
-            Map<String, KeysAndAttributes> unprocessedKeys = 
result.getUnprocessedKeys();
+            final Map<String, KeysAndAttributes> unprocessedKeys = 
result.getUnprocessedKeys();
             if ( unprocessedKeys != null && unprocessedKeys.size() > 0) {
-                KeysAndAttributes keysAndAttributes = 
unprocessedKeys.get(table);
-                List<Map<String, AttributeValue>> keys = 
keysAndAttributes.getKeys();
+                final KeysAndAttributes keysAndAttributes = 
unprocessedKeys.get(table);
+                final List<Map<String, AttributeValue>> keys = 
keysAndAttributes.getKeys();
 
-                for (Map<String,AttributeValue> unprocessedKey : keys) {
-                    Object hashKeyValue = getAttributeValue(context, 
HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
-                    Object rangeKeyValue = getAttributeValue(context, 
RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
+                for (final Map<String,AttributeValue> unprocessedKey : keys) {
+                    final Object hashKeyValue = getAttributeValue(context, 
HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName));
+                    final Object rangeKeyValue = getAttributeValue(context, 
RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName));
                     sendUnprocessedToUnprocessedRelationship(session, 
keysToFlowFileMap, hashKeyValue, rangeKeyValue);
                 }
             }
 
             // Handle any remaining items
-            for (ItemKeys key : keysToFlowFileMap.keySet()) {
+            for (final ItemKeys key : keysToFlowFileMap.keySet()) {
                 FlowFile flowFile = keysToFlowFileMap.get(key);
                 flowFile = session.putAttribute(flowFile, 
DYNAMODB_KEY_ERROR_NOT_FOUND, DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE + 
key.toString() );
                 session.transfer(flowFile,REL_NOT_FOUND);
                 keysToFlowFileMap.remove(key);
             }
 
-        } catch(AmazonServiceException exception) {
+        } catch(final AmazonServiceException exception) {
             getLogger().error("Could not process flowFiles due to service 
exception : " + exception.getMessage());
             List<FlowFile> failedFlowFiles = processServiceException(session, 
flowFiles, exception);
             session.transfer(failedFlowFiles, REL_FAILURE);
-        } catch(AmazonClientException exception) {
+        } catch(final AmazonClientException exception) {
             getLogger().error("Could not process flowFiles due to client 
exception : " + exception.getMessage());
             List<FlowFile> failedFlowFiles = processClientException(session, 
flowFiles, exception);
             session.transfer(failedFlowFiles, REL_FAILURE);
-        } catch(Exception exception) {
+        } catch(final Exception exception) {
             getLogger().error("Could not process flowFiles due to exception : 
" + exception.getMessage());
             List<FlowFile> failedFlowFiles = processException(session, 
flowFiles, exception);
             session.transfer(failedFlowFiles, REL_FAILURE);
         }
     }
+
+    private Map<ItemKeys, FlowFile> getKeysToFlowFileMap(final ProcessContext 
context, final ProcessSession session, final List<FlowFile> flowFiles) {
+        final Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>();
+
+        final String hashKeyName = 
context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
+        final String rangeKeyName = 
context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
+
+        for (final FlowFile flowFile : flowFiles) {
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, flowFile.getAttributes());
+            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
+
+            if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, 
flowFile)) {
+                continue;
+            }
+
+            if (!isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, 
session, flowFile)) {
+                continue;
+            }
+
+            keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), 
flowFile);
+        }
+        return keysToFlowFileMap;
+    }
+
+    private TableKeysAndAttributes getTableKeysAndAttributes(final 
ProcessContext context, final Map<String, String>... attributes) {
+        final String table = 
context.getProperty(TABLE).evaluateAttributeExpressions().getValue();
+        final TableKeysAndAttributes tableKeysAndAttributes = new 
TableKeysAndAttributes(table);
+
+        final String hashKeyName = 
context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue();
+        final String rangeKeyName = 
context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue();
+
+        for (final Map<String, String> attributeMap : attributes) {
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, attributeMap);
+            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, attributeMap);
+
+            validateHashKeyValue(hashKeyValue);
+            validateRangeKeyValue(rangeKeyName, rangeKeyValue);
+
+            if (rangeKeyValue == null || 
StringUtils.isBlank(rangeKeyValue.toString())) {
+                tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName, 
hashKeyValue);
+            } else {
+                tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName, 
hashKeyValue, rangeKeyName, rangeKeyValue);
+            }
+        }
+        return tableKeysAndAttributes;
+    }
+
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
index 92e552a..23b8bab 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java
@@ -118,8 +118,8 @@ public class PutDynamoDB extends 
AbstractWriteDynamoDBProcessor {
         TableWriteItems tableWriteItems = new TableWriteItems(table);
 
         for (FlowFile flowFile : flowFiles) {
-            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, flowFile);
-            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile);
+            final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, 
HASH_KEY_VALUE, flowFile.getAttributes());
+            final Object rangeKeyValue = getValue(context, 
RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes());
 
             if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, 
flowFile)) {
                 continue;
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index 719f1cf..b9de5fe 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -16,16 +16,12 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.SSEAlgorithm;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -37,23 +33,30 @@ import 
org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.S3Object;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 @SupportsBatching
 @SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class})
@@ -149,6 +152,36 @@ public class FetchS3Object extends AbstractS3Processor {
     }
 
     @Override
+    public List<ConfigVerificationResult> verify(ProcessContext context, 
ComponentLog verificationLogger, Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new 
ArrayList<>(super.verify(context, verificationLogger, attributes));
+
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+
+        final AmazonS3 client = getConfiguration(context).getClient();
+        final GetObjectMetadataRequest request = 
createGetObjectMetadataRequest(context, attributes);
+
+        try {
+            final ObjectMetadata objectMetadata = 
client.getObjectMetadata(request);
+            final long byteCount = objectMetadata.getContentLength();
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("HEAD S3 Object")
+                    .outcome(Outcome.SUCCESSFUL)
+                    .explanation(String.format("Successfully performed HEAD on 
[%s] (%s bytes) from Bucket [%s]", key, byteCount, bucket))
+                    .build());
+        } catch (final Exception e) {
+            getLogger().error(String.format("Failed to fetch [%s] from Bucket 
[%s]", key, bucket), e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .verificationStepName("HEAD S3 Object")
+                    .outcome(Outcome.FAILED)
+                    .explanation(String.format("Failed to perform HEAD on [%s] 
from Bucket [%s]: %s", key, bucket, e.getMessage()))
+                    .build());
+        }
+
+        return results;
+    }
+
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         FlowFile flowFile = session.get();
         if (flowFile == null) {
@@ -156,48 +189,18 @@ public class FetchS3Object extends AbstractS3Processor {
         }
 
         final long startNanos = System.nanoTime();
-        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
-        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
-        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue();
-        final boolean requesterPays = 
context.getProperty(REQUESTER_PAYS).asBoolean();
-        final long rangeStart = (context.getProperty(RANGE_START).isSet() ? 
context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()
 : 0L);
-        final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? 
context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue()
 : null);
-
-        final AmazonS3 client = getClient();
-        final GetObjectRequest request;
-        if (versionId == null) {
-            request = new GetObjectRequest(bucket, key);
-        } else {
-            request = new GetObjectRequest(bucket, key, versionId);
-        }
-        request.setRequesterPays(requesterPays);
-
-        // tl;dr don't setRange(0) on GetObjectRequest because it results in
-        // InvalidRange errors on zero byte objects.
-        //
-        // Amazon S3 sets byte ranges using HTTP Range headers as described in
-        // https://datatracker.ietf.org/doc/html/rfc2616#section-14.35 and
-        // https://datatracker.ietf.org/doc/html/rfc7233#section-2.1. There
-        // isn't a satisfiable byte range specification for zero length objects
-        // so 416 (Request range not satisfiable) is returned.
-        //
-        // Since the effect of the byte range 0- is equivalent to not sending a
-        // byte range and works for both zero and non-zero length objects,
-        // the single argument setRange() only needs to be called when the
-        // first byte position is greater than zero.
-        if (rangeLength != null) {
-            request.setRange(rangeStart, rangeStart + rangeLength - 1);
-        } else if (rangeStart > 0) {
-            request.setRange(rangeStart);
-        }
 
         final Map<String, String> attributes = new HashMap<>();
 
-        AmazonS3EncryptionService encryptionService = 
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+        final AmazonS3EncryptionService encryptionService = 
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
         if (encryptionService != null) {
-            encryptionService.configureGetObjectRequest(request, new 
ObjectMetadata());
             attributes.put("s3.encryptionStrategy", 
encryptionService.getStrategyName());
         }
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+
+        final AmazonS3 client = getClient();
+        final GetObjectRequest request = createGetObjectRequest(context, 
flowFile.getAttributes());
 
         try (final S3Object s3Object = client.getObject(request)) {
             if (s3Object == null) {
@@ -272,6 +275,64 @@ public class FetchS3Object extends AbstractS3Processor {
         session.getProvenanceReporter().fetch(flowFile, "http://"; + bucket + 
".amazonaws.com/" + key, transferMillis);
     }
 
+    private GetObjectMetadataRequest createGetObjectMetadataRequest(final 
ProcessContext context, final Map<String, String> attributes) {
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(attributes).getValue();
+        final boolean requesterPays = 
context.getProperty(REQUESTER_PAYS).asBoolean();
+
+        final GetObjectMetadataRequest request;
+        if (versionId == null) {
+            request = new GetObjectMetadataRequest(bucket, key);
+        } else {
+            request = new GetObjectMetadataRequest(bucket, key, versionId);
+        }
+        request.setRequesterPays(requesterPays);
+        return request;
+    }
+
+    private GetObjectRequest createGetObjectRequest(final ProcessContext 
context, final Map<String, String> attributes) {
+        final String bucket = 
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
+        final String key = 
context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue();
+        final String versionId = 
context.getProperty(VERSION_ID).evaluateAttributeExpressions(attributes).getValue();
+        final boolean requesterPays = 
context.getProperty(REQUESTER_PAYS).asBoolean();
+        final long rangeStart = (context.getProperty(RANGE_START).isSet() ? 
context.getProperty(RANGE_START).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue()
 : 0L);
+        final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? 
context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue()
 : null);
+
+        final GetObjectRequest request;
+        if (versionId == null) {
+            request = new GetObjectRequest(bucket, key);
+        } else {
+            request = new GetObjectRequest(bucket, key, versionId);
+        }
+        request.setRequesterPays(requesterPays);
+
+        // tl;dr don't setRange(0) on GetObjectRequest because it results in
+        // InvalidRange errors on zero byte objects.
+        //
+        // Amazon S3 sets byte ranges using HTTP Range headers as described in
+        // https://datatracker.ietf.org/doc/html/rfc2616#section-14.35 and
+        // https://datatracker.ietf.org/doc/html/rfc7233#section-2.1. There
+        // isn't a satisfiable byte range specification for zero length objects
+        // so 416 (Request range not satisfiable) is returned.
+        //
+        // Since the effect of the byte range 0- is equivalent to not sending a
+        // byte range and works for both zero and non-zero length objects,
+        // the single argument setRange() only needs to be called when the
+        // first byte position is greater than zero.
+        if (rangeLength != null) {
+            request.setRange(rangeStart, rangeStart + rangeLength - 1);
+        } else if (rangeStart > 0) {
+            request.setRange(rangeStart);
+        }
+
+        final AmazonS3EncryptionService encryptionService = 
context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class);
+        if (encryptionService != null) {
+            encryptionService.configureGetObjectRequest(request, new 
ObjectMetadata());
+        }
+        return request;
+    }
+
     protected void setFilePathAttributes(Map<String, String> attributes, 
String filePathName) {
         final int lastSlash = filePathName.lastIndexOf("/");
         if (lastSlash > -1 && lastSlash < filePathName.length() - 1) {
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
index 5446650..5ee1a83 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java
@@ -53,7 +53,6 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
-import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -901,14 +900,10 @@ public class ListS3 extends AbstractS3Processor 
implements VerifiableProcessor {
 
     @Override
     public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog logger, final Map<String, String> attributes) {
-        final ControllerService service = 
context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService();
-        final AmazonS3Client client = service != null ? createClient(context, 
getCredentialsProvider(context), createConfiguration(context))
-            : createClient(context, getCredentials(context), 
createConfiguration(context));
+        final AmazonS3Client client = getConfiguration(context).getClient();
 
-        getRegionAndInitializeEndpoint(context, client);
-
-        final List<ConfigVerificationResult> results = new ArrayList<>();
-        final String bucketName = 
context.getProperty(BUCKET).evaluateAttributeExpressions().getValue();
+        final List<ConfigVerificationResult> results = new 
ArrayList<>(super.verify(context, logger, attributes));
+        final String bucketName = 
context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue();
         final long minAgeMilliseconds = 
context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
 
         if (bucketName == null || bucketName.trim().isEmpty()) {
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
index d138dcc..8c6fc77 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java
@@ -26,6 +26,8 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -41,6 +43,7 @@ import 
org.apache.nifi.processors.aws.wag.client.GenericApiGatewayResponse;
 import org.apache.nifi.stream.io.StreamUtils;
 
 import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -70,6 +73,8 @@ import java.util.concurrent.TimeUnit;
         + "of the Dynamic Property.")
 public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor {
 
+    private static final Set<String> IDEMPOTENT_METHODS = new 
HashSet<>(Arrays.asList("GET", "HEAD", "OPTIONS"));
+
     public static final List<PropertyDescriptor> properties = 
Collections.unmodifiableList(Arrays
             .asList(
                     PROP_METHOD,
@@ -151,8 +156,8 @@ public class InvokeAWSGatewayApi extends 
AbstractAWSGatewayApiProcessor {
     }
 
     @Override
-    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
-        ComponentLog logger = getLogger();
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final ComponentLog logger = getLogger();
         FlowFile requestFlowFile = session.get();
 
         // Checking to see if the property to put the body of the response in 
an attribute was set
@@ -179,41 +184,20 @@ public class InvokeAWSGatewayApi extends 
AbstractAWSGatewayApiProcessor {
 
             final GenericApiGatewayClient client = getClient();
 
-            final GenericApiGatewayRequest request = configureRequest(context, 
session,
-                                                                      
resourceName,
-                                                                      
requestFlowFile);
-
-            logRequest(logger, client.getEndpoint(), request);
             final long startNanos = System.nanoTime();
-            GenericApiGatewayResponse response = null;
-            GenericApiGatewayException exception = null;
-            try {
-                response = client.execute(request);
-                logResponse(logger, response);
-            } catch (GenericApiGatewayException gag) {
-                // ERROR response codes may come back as exceptions, 404 for 
example
-                exception = gag;
-            }
+            final Map<String, String> attributes = requestFlowFile == null ? 
Collections.emptyMap() : requestFlowFile.getAttributes();
+            final GatewayResponse gatewayResponse = invokeGateway(client, 
context, session, requestFlowFile, attributes, logger);
 
-            final int statusCode;
-            if (exception != null) {
-                statusCode = exception.getStatusCode();
-            } else {
-                statusCode = response.getHttpResponse().getStatusCode();
-            }
+            final GenericApiGatewayResponse response = 
gatewayResponse.response;
+            final GenericApiGatewayException exception = 
gatewayResponse.exception;
+            final int statusCode = gatewayResponse.statusCode;
 
-            if (statusCode == 0) {
-                throw new IllegalStateException(
-                    "Status code unknown, connection hasn't been attempted.");
-            }
             final String endpoint = 
context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue();
-            boolean outputRegardless = 
context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS)
+            final boolean outputRegardless = 
context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS)
                                               .asBoolean();
 
-            boolean outputBodyToResponseContent = (isSuccess(statusCode) && 
!putToAttribute
-                || outputRegardless);
-            boolean outputBodyToRequestAttribute =
-                (!isSuccess(statusCode) || putToAttribute) && requestFlowFile 
!= null;
+            boolean outputBodyToResponseContent = (isSuccess(statusCode) && 
!putToAttribute || outputRegardless);
+            boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || 
putToAttribute) && requestFlowFile != null;
             boolean bodyExists = response != null && response.getBody() != 
null;
 
             final String statusExplanation;
@@ -246,9 +230,7 @@ public class InvokeAWSGatewayApi extends 
AbstractAWSGatewayApiProcessor {
                     if 
(context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean()) {
                         // write the response headers as attributes
                         // this will overwrite any existing flowfile attributes
-                        requestFlowFile = 
session.putAllAttributes(requestFlowFile,
-                                                                   
convertAttributesFromHeaders(
-                                                                       
response));
+                        requestFlowFile = 
session.putAllAttributes(requestFlowFile, 
convertAttributesFromHeaders(response));
                     }
                 } else {
                     responseFlowFile = session.create();
@@ -282,8 +264,7 @@ public class InvokeAWSGatewayApi extends 
AbstractAWSGatewayApiProcessor {
                                     responseFlowFile);
 
                     // emit provenance event
-                    final long millis = TimeUnit.NANOSECONDS
-                        .toMillis(System.nanoTime() - startNanos);
+                    final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
                     if (requestFlowFile != null) {
                         
session.getProvenanceReporter().fetch(responseFlowFile, endpoint, millis);
                     } else {
@@ -317,7 +298,7 @@ public class InvokeAWSGatewayApi extends 
AbstractAWSGatewayApiProcessor {
                 if (attributeKey == null) {
                     attributeKey = RESPONSE_BODY;
                 }
-                byte[] outputBuffer;
+                final byte[] outputBuffer;
                 int size = 0;
                 outputBuffer = new byte[maxAttributeSize];
                 if (bodyExists) {
@@ -340,17 +321,13 @@ public class InvokeAWSGatewayApi extends 
AbstractAWSGatewayApiProcessor {
                 requestFlowFile = session.putAllAttributes(requestFlowFile, 
statusAttributes);
 
                 final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                
session.getProvenanceReporter().modifyAttributes(requestFlowFile,
-                                                                 "The " + 
attributeKey
-                                                                     + " has 
been added. The value of which is the body of a http call to "
-                                                                     + 
endpoint + resourceName
-                                                                     + ". It 
took " + millis
-                                                                     + 
"millis,");
+                
session.getProvenanceReporter().modifyAttributes(requestFlowFile, String
+                        .format("The %s has been added. The value of which is 
the body of a http call to %s%s. It took %s millis,", attributeKey, endpoint, 
resourceName, millis));
             }
 
             route(requestFlowFile, responseFlowFile, session, context, 
statusCode,
                   getRelationships());
-        } catch (Exception e) {
+        } catch (final Exception e) {
             // penalize or yield
             if (requestFlowFile != null) {
                 logger.error("Routing to {} due to exception: {}",
@@ -364,8 +341,7 @@ public class InvokeAWSGatewayApi extends 
AbstractAWSGatewayApiProcessor {
                 session.transfer(requestFlowFile,
                                  getRelationshipForName(REL_FAILURE_NAME, 
getRelationships()));
             } else {
-                logger.error(
-                    "Yielding processor due to exception encountered as a 
source processor: {}", e);
+                logger.error("Yielding processor due to exception encountered 
as a source processor: {}", e);
                 context.yield();
             }
 
@@ -380,4 +356,92 @@ public class InvokeAWSGatewayApi extends 
AbstractAWSGatewayApiProcessor {
             }
         }
     }
+
+    @Override
+    public List<ConfigVerificationResult> verify(final ProcessContext context, 
final ComponentLog verificationLogger, final Map<String, String> attributes) {
+        final List<ConfigVerificationResult> results = new 
ArrayList<>(super.verify(context, verificationLogger, attributes));
+
+        final String method = context.getProperty(PROP_METHOD).getValue();
+
+        if (!IDEMPOTENT_METHODS.contains(method)) {
+            return results;
+        }
+
+        final String endpoint = 
context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue();
+        final String resource = 
context.getProperty(PROP_RESOURCE_NAME).getValue();
+        try {
+            final GenericApiGatewayClient client = 
getConfiguration(context).getClient();
+
+            final GatewayResponse gatewayResponse = invokeGateway(client, 
context, null, null, attributes, verificationLogger);
+
+            final String explanation;
+            if (gatewayResponse.exception != null) {
+                final String statusExplanation = 
EnglishReasonPhraseCatalog.INSTANCE.getReason(gatewayResponse.statusCode, null);
+                explanation = String.format("Successfully invoked AWS Gateway 
API [%s %s/%s] with blank request body, receiving error response [%s] with 
status code [%s]",
+                        method, endpoint, resource, statusExplanation, 
gatewayResponse.statusCode);
+            } else {
+                final String statusExplanation = 
gatewayResponse.response.getHttpResponse().getStatusText();
+                explanation = String.format("Successfully invoked AWS Gateway 
API [%s %s%/s] with blank request body, receiving success response [%s] with 
status code [%s]",
+                        method, endpoint, resource, statusExplanation, 
gatewayResponse.statusCode);
+            }
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.SUCCESSFUL)
+                    .verificationStepName("Invoke AWS Gateway API")
+                    .explanation(explanation)
+                    .build());
+
+        } catch (final Exception e) {
+            verificationLogger.error("Failed to invoke AWS Gateway API " + 
endpoint, e);
+            results.add(new ConfigVerificationResult.Builder()
+                    .outcome(Outcome.FAILED)
+                    .verificationStepName("Invoke AWS Gateway API")
+                    .explanation(String.format("Failed to invoke AWS Gateway 
API [%s %s/%s]: %s", method, endpoint, resource, e.getMessage()))
+                    .build());
+        }
+
+        return results;
+    }
+
+    private GatewayResponse invokeGateway(final GenericApiGatewayClient 
client, final ProcessContext context, final ProcessSession session,
+                                          final FlowFile requestFlowFile, 
final Map<String, String> attributes, final ComponentLog logger) {
+        final String resourceName = 
context.getProperty(PROP_RESOURCE_NAME).getValue();
+
+        final GenericApiGatewayRequest request = configureRequest(context, 
session, resourceName, requestFlowFile, attributes);
+
+        logRequest(logger, client.getEndpoint(), request);
+        GenericApiGatewayResponse response = null;
+        GenericApiGatewayException exception = null;
+        try {
+            response = client.execute(request);
+            logResponse(logger, response);
+        } catch (final GenericApiGatewayException gag) {
+            // ERROR response codes may come back as exceptions, 404 for 
example
+            exception = gag;
+        }
+
+        final int statusCode;
+        if (exception != null) {
+            statusCode = exception.getStatusCode();
+        } else {
+            statusCode = response.getHttpResponse().getStatusCode();
+        }
+
+        if (statusCode == 0) {
+            throw new IllegalStateException(
+                    "Status code unknown, connection hasn't been attempted.");
+        }
+        return new GatewayResponse(response, exception, statusCode);
+    }
+
+    private class GatewayResponse {
+        private final GenericApiGatewayResponse response;
+        private final GenericApiGatewayException exception;
+        private final int statusCode;
+
+        private GatewayResponse(final GenericApiGatewayResponse response, 
final GenericApiGatewayException exception, final int statusCode) {
+            this.response = response;
+            this.exception = exception;
+            this.statusCode = statusCode;
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
index 42d56a8..c380248 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java
@@ -16,35 +16,42 @@
  */
 package org.apache.nifi.processors.aws.dynamodb;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
-import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.util.MockFlowFile;
-import org.apache.nifi.util.TestRunner;
-import org.apache.nifi.util.TestRunners;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.ArgumentMatchers;
-import org.mockito.Mockito;
-
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.regions.Regions;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
 import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome;
 import com.amazonaws.services.dynamodbv2.document.DynamoDB;
 import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes;
 import com.amazonaws.services.dynamodbv2.model.AttributeValue;
 import com.amazonaws.services.dynamodbv2.model.BatchGetItemResult;
 import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION;
+import static 
org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class GetDynamoDBTest extends AbstractDynamoDBTest {
     protected GetDynamoDB getDynamoDB;
@@ -79,13 +86,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         };
 
-        getDynamoDB = new GetDynamoDB() {
-            @Override
-            protected DynamoDB getDynamoDB() {
-                return mockDynamoDB;
-            }
-        };
-
+        getDynamoDB = mockDynamoDB(mockDynamoDB);
     }
 
     @Test
@@ -105,6 +106,9 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         getRunner.run(1);
 
+        // No actual items returned
+        assertVerificationResults(getRunner, 0, 0);
+
         
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED,
 1);
 
         List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
@@ -144,12 +148,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         };
 
-        getDynamoDB = new GetDynamoDB() {
-            @Override
-            protected DynamoDB getDynamoDB() {
-                return mockDynamoDB;
-            }
-        };
+        getDynamoDB = mockDynamoDB(mockDynamoDB);
         final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
 
         getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
@@ -165,6 +164,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         getRunner.run(1);
 
+        assertVerificationResults(getRunner, 1, 0);
+
         
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 
1);
 
         List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS);
@@ -205,12 +206,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         };
 
-        getDynamoDB = new GetDynamoDB() {
-            @Override
-            protected DynamoDB getDynamoDB() {
-                return mockDynamoDB;
-            }
-        };
+        getDynamoDB = mockDynamoDB(mockDynamoDB);
         final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
 
         getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
@@ -225,6 +221,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
         getRunner.enqueue(new byte[] {});
 
         getRunner.run(1);
+        assertVerificationResults(getRunner, 1, 1);
         
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 
1);
     }
 
@@ -239,12 +236,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         };
 
-        final GetDynamoDB getDynamoDB = new GetDynamoDB() {
-            @Override
-            protected DynamoDB getDynamoDB() {
-                return mockDynamoDB;
-            }
-        };
+        final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB);
 
         final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
 
@@ -261,6 +253,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         getRunner.run(1);
 
+        assertVerificationFailure(getRunner);
         
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
         List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
         for (MockFlowFile flowFile : flowFiles) {
@@ -281,12 +274,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         };
 
-        final GetDynamoDB getDynamoDB = new GetDynamoDB() {
-            @Override
-            protected DynamoDB getDynamoDB() {
-                return mockDynamoDB;
-            }
-        };
+        final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB);
 
         final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
 
@@ -303,6 +291,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         getRunner.run(1);
 
+        assertVerificationFailure(getRunner);
+
         
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
         List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
         for (MockFlowFile flowFile : flowFiles) {
@@ -322,12 +312,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         };
 
-        final GetDynamoDB getDynamoDB = new GetDynamoDB() {
-            @Override
-            protected DynamoDB getDynamoDB() {
-                return mockDynamoDB;
-            }
-        };
+        final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB);
 
         final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
 
@@ -344,6 +329,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         getRunner.run(1);
 
+        assertVerificationFailure(getRunner);
+
         
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
         List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
         for (MockFlowFile flowFile : flowFiles) {
@@ -370,12 +357,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
             }
         };
 
-        final GetDynamoDB getDynamoDB = new GetDynamoDB() {
-            @Override
-            protected DynamoDB getDynamoDB() {
-                return notFoundMockDynamoDB;
-            }
-        };
+        final GetDynamoDB getDynamoDB = mockDynamoDB(notFoundMockDynamoDB);
         final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
 
         getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd");
@@ -391,6 +373,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         getRunner.run(1);
 
+        assertVerificationResults(getRunner, 0, 0);
+
         getRunner.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1);
 
         List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED);
@@ -407,12 +391,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
         // When writing, mock thrown service exception from AWS
         
Mockito.when(mockDynamoDb.batchGetItem(ArgumentMatchers.<TableKeysAndAttributes>any())).thenThrow(getSampleAwsServiceException());
 
-        getDynamoDB = new GetDynamoDB() {
-            @Override
-            protected DynamoDB getDynamoDB() {
-                return mockDynamoDb;
-            }
-        };
+        getDynamoDB = mockDynamoDB(mockDynamoDb);
 
         final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB);
 
@@ -427,6 +406,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
 
         getRunner.run(1);
 
+        assertVerificationFailure(getRunner);
+
         
getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 
1);
 
         List<MockFlowFile> flowFiles = 
getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE);
@@ -540,4 +521,41 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest {
             
assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED));
         }
     }
+
+    private GetDynamoDB mockDynamoDB(final DynamoDB mockDynamoDB) {
+        return new GetDynamoDB() {
+            @Override
+            protected DynamoDB getDynamoDB() {
+                return mockDynamoDB;
+            }
+            @Override
+            protected DynamoDB getDynamoDB(final AmazonDynamoDBClient client) {
+                return mockDynamoDB;
+            }
+
+            @Override
+            protected 
AbstractAWSProcessor<AmazonDynamoDBClient>.AWSConfiguration 
getConfiguration(final ProcessContext context) {
+                final AmazonDynamoDBClient client = 
Mockito.mock(AmazonDynamoDBClient.class);
+                return new AWSConfiguration(client, region);
+            }
+        };
+    }
+
+    private void assertVerificationFailure(final TestRunner runner) {
+        final List<ConfigVerificationResult> results = ((VerifiableProcessor) 
runner.getProcessor())
+                .verify(runner.getProcessContext(), runner.getLogger(), 
Collections.emptyMap());
+        assertEquals(3, results.size());
+        assertEquals(Outcome.SUCCESSFUL, results.get(0).getOutcome());
+        assertEquals(Outcome.SUCCESSFUL, results.get(1).getOutcome());
+        assertEquals(Outcome.FAILED, results.get(2).getOutcome());
+    }
+
+    private void assertVerificationResults(final TestRunner runner, final int 
expectedTotalCount, final int expectedJsonDocumentCount) {
+        final List<ConfigVerificationResult> results = ((VerifiableProcessor) 
runner.getProcessor())
+                .verify(runner.getProcessContext(), runner.getLogger(), 
Collections.emptyMap());
+        assertEquals(3, results.size());
+        results.forEach(result -> assertEquals(Outcome.SUCCESSFUL, 
result.getOutcome()));
+        assertTrue(results.get(2).getExplanation().contains("retrieved " + 
expectedTotalCount + " items"));
+        
assertTrue(results.get(2).getExplanation().contains(expectedJsonDocumentCount + 
" JSON"));
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
index bbd341c..e98fc55 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java
@@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
 
 import static org.junit.Assert.assertTrue;
 
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -79,6 +81,11 @@ public class ITPutGetDeleteGetDynamoDBTest extends 
ITAbstractDynamoDBTest {
             assertEquals(document, new String(flowFile.toByteArray()));
         }
 
+        final GetDynamoDB getDynamoDB = (GetDynamoDB) getRunner.getProcessor();
+        final List<ConfigVerificationResult> results = 
getDynamoDB.verify(getRunner.getProcessContext(), getRunner.getLogger(), 
Collections.emptyMap());
+        assertEquals(2, results.size());
+        assertEquals("Successfully retrieved 1 items, including 1 JSON 
documents, from DynamoDB", results.get(1).getExplanation());
+
         final TestRunner deleteRunner = 
TestRunners.newTestRunner(DeleteDynamoDB.class);
 
         deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 25a1e4d..71915a1 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -22,9 +22,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.exception.FlowFileAccessException;
+import org.apache.nifi.processors.aws.AbstractAWSProcessor;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -45,6 +48,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 
 public class TestFetchS3Object {
@@ -56,12 +62,17 @@ public class TestFetchS3Object {
 
     @Before
     public void setUp() {
-        mockS3Client = Mockito.mock(AmazonS3Client.class);
+        mockS3Client = mock(AmazonS3Client.class);
         mockFetchS3Object = new FetchS3Object() {
             protected AmazonS3Client getClient() {
                 actualS3Client = client;
                 return mockS3Client;
             }
+
+            @Override
+            protected AbstractAWSProcessor<AmazonS3Client>.AWSConfiguration 
getConfiguration(ProcessContext context) {
+                return new AWSConfiguration(mockS3Client, null);
+            }
         };
         runner = TestRunners.newTestRunner(mockFetchS3Object);
     }
@@ -90,12 +101,21 @@ public class TestFetchS3Object {
         userMetadata.put("userKey2", "userValue2");
         metadata.setUserMetadata(userMetadata);
         metadata.setSSEAlgorithm("testAlgorithm");
-        Mockito.when(metadata.getETag()).thenReturn("test-etag");
+        when(metadata.getETag()).thenReturn("test-etag");
         s3ObjectResponse.setObjectMetadata(metadata);
-        
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
+        
when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
+
+        final long mockSize = 20L;
+        final ObjectMetadata objectMetadata = mock(ObjectMetadata.class);
+        when(objectMetadata.getContentLength()).thenReturn(mockSize);
+        when(mockS3Client.getObjectMetadata(any())).thenReturn(objectMetadata);
 
         runner.run(1);
 
+        final List<ConfigVerificationResult> results = 
mockFetchS3Object.verify(runner.getProcessContext(), runner.getLogger(), attrs);
+        assertEquals(2, results.size());
+        results.forEach(result -> 
assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, result.getOutcome()));
+
         ArgumentCaptor<GetObjectRequest> captureRequest = 
ArgumentCaptor.forClass(GetObjectRequest.class);
         Mockito.verify(mockS3Client, 
Mockito.times(1)).getObject(captureRequest.capture());
         GetObjectRequest request = captureRequest.getValue();
@@ -148,9 +168,9 @@ public class TestFetchS3Object {
         userMetadata.put("userKey2", "userValue2");
         metadata.setUserMetadata(userMetadata);
         metadata.setSSEAlgorithm("testAlgorithm");
-        Mockito.when(metadata.getETag()).thenReturn("test-etag");
+        when(metadata.getETag()).thenReturn("test-etag");
         s3ObjectResponse.setObjectMetadata(metadata);
-        
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
+        
when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
 
         runner.run(1);
 
@@ -196,9 +216,9 @@ public class TestFetchS3Object {
         s3ObjectResponse.setObjectContent(new StringInputStream("Some 
Content"));
         ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class);
         metadata.setContentDisposition("key/path/to/file.txt");
-        Mockito.when(metadata.getVersionId()).thenReturn("response-version");
+        when(metadata.getVersionId()).thenReturn("response-version");
         s3ObjectResponse.setObjectMetadata(metadata);
-        
Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
+        
when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse);
 
         runner.run(1);
 
@@ -242,7 +262,7 @@ public class TestFetchS3Object {
         final Map<String, String> attrs = new HashMap<>();
         attrs.put("filename", "request-key");
         runner.enqueue(new byte[0], attrs);
-        Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(null);
+        when(mockS3Client.getObject(Mockito.any())).thenReturn(null);
 
         runner.run(1);
 
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
index 2942f23..275f0cc 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java
@@ -128,7 +128,9 @@ public class TestListS3 {
 
         final List<ConfigVerificationResult> results = ((VerifiableProcessor) 
runner.getProcessor())
                 .verify(runner.getProcessContext(), runner.getLogger(), 
Collections.emptyMap());
-        assertTrue(results.get(0).getExplanation().contains("finding 3 
objects"));
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
results.get(0).getOutcome());
+        assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, 
results.get(1).getOutcome());
+        assertTrue(results.get(1).getExplanation().contains("finding 3 
objects"));
     }
 
     @Test
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
index 1179671..76a5bf9 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java
@@ -16,17 +16,9 @@
  */
 package org.apache.nifi.processors.aws.wag;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
-import static org.mockito.Mockito.times;
-
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.http.AmazonHttpClient;
 import com.amazonaws.http.apache.client.impl.SdkHttpClient;
-import java.io.ByteArrayInputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpVersion;
 import org.apache.http.client.methods.HttpUriRequest;
@@ -43,6 +35,15 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.ByteArrayInputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.Mockito.times;
+
 public class TestInvokeAmazonGatewayApiMock {
 
     private TestRunner runner = null;

Reply via email to