This is an automated email from the ASF dual-hosted git repository. chriss pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new f07c372 NIFI-9353: Adding Config Verification to AWS Processors f07c372 is described below commit f07c37285ebfb6af33fa8a5906bd605890e66908 Author: Joe Gresock <jgres...@gmail.com> AuthorDate: Sun Oct 31 07:05:08 2021 -0400 NIFI-9353: Adding Config Verification to AWS Processors This closes #5504 --- .../AbstractAWSCredentialsProviderProcessor.java | 64 +++--- .../nifi/processors/aws/AbstractAWSProcessor.java | 54 ++++- .../aws/dynamodb/AbstractDynamoDBProcessor.java | 65 +++--- .../aws/wag/AbstractAWSGatewayApiProcessor.java | 109 +++++------ .../processors/aws/dynamodb/DeleteDynamoDB.java | 4 +- .../nifi/processors/aws/dynamodb/GetDynamoDB.java | 218 +++++++++++++++------ .../nifi/processors/aws/dynamodb/PutDynamoDB.java | 4 +- .../nifi/processors/aws/s3/FetchS3Object.java | 163 ++++++++++----- .../org/apache/nifi/processors/aws/s3/ListS3.java | 11 +- .../processors/aws/wag/InvokeAWSGatewayApi.java | 156 ++++++++++----- .../processors/aws/dynamodb/GetDynamoDBTest.java | 156 ++++++++------- .../dynamodb/ITPutGetDeleteGetDynamoDBTest.java | 7 + .../nifi/processors/aws/s3/TestFetchS3Object.java | 36 +++- .../apache/nifi/processors/aws/s3/TestListS3.java | 4 +- .../aws/wag/TestInvokeAmazonGatewayApiMock.java | 17 +- 15 files changed, 706 insertions(+), 362 deletions(-) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java index 923878c..8b2c560 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSCredentialsProviderProcessor.java @@ -15,16 +15,23 @@ * limitations under the License. */ package org.apache.nifi.processors.aws; -import org.apache.nifi.annotation.lifecycle.OnScheduled; + +import com.amazonaws.AmazonWebServiceClient; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentialsProvider; import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.VerifiableProcessor; import org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService; -import com.amazonaws.AmazonWebServiceClient; -import com.amazonaws.ClientConfiguration; -import com.amazonaws.auth.AWSCredentialsProvider; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; /** * Base class for aws processors that uses AWSCredentialsProvider interface for creating aws clients. @@ -34,7 +41,7 @@ import com.amazonaws.auth.AWSCredentialsProvider; * @see <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html">AWSCredentialsProvider</a> */ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends AmazonWebServiceClient> - extends AbstractAWSProcessor<ClientType> { + extends AbstractAWSProcessor<ClientType> implements VerifiableProcessor { /** * AWS credentials provider service @@ -49,31 +56,21 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends .build(); /** - * This method checks if {#link {@link #AWS_CREDENTIALS_PROVIDER_SERVICE} is available and if it - * is, uses the credentials provider, otherwise it invokes the {@link AbstractAWSProcessor#onScheduled(ProcessContext)} - * which uses static AWSCredentials for the aws processors + * Attempts to create the client using the controller service first before falling back to the standard configuration. + * @param context The process context + * @return The created client */ - @OnScheduled - public void onScheduled(ProcessContext context) { - ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(); + protected ClientType createClient(final ProcessContext context) { + final ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(); if (service != null) { getLogger().debug("Using aws credentials provider service for creating client"); - onScheduledUsingControllerService(context); + return createClient(context, getCredentialsProvider(context), createConfiguration(context)); } else { getLogger().debug("Using aws credentials for creating client"); - super.onScheduled(context); + return super.createClient(context); } } - /** - * Create aws client using credentials provider - * @param context the process context - */ - protected void onScheduledUsingControllerService(ProcessContext context) { - this.client = createClient(context, getCredentialsProvider(context), createConfiguration(context)); - super.initializeRegionAndEndpoint(context, this.client); - } - @OnShutdown public void onShutDown() { if ( this.client != null ) { @@ -81,6 +78,29 @@ public abstract class AbstractAWSCredentialsProviderProcessor<ClientType extends } } + @Override + public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) { + final List<ConfigVerificationResult> results = new ArrayList<>(); + + try { + getConfiguration(context); + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.SUCCESSFUL) + .verificationStepName("Create Client and Configure Region") + .explanation("Successfully created AWS Client and configured Region") + .build()); + } catch (final Exception e) { + verificationLogger.error("Failed to create AWS Client", e); + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.FAILED) + .verificationStepName("Create Client and Configure Region") + .explanation("Failed to crete AWS Client or configure Region: " + e.getMessage()) + .build()); + } + + return results; + } + /** * Get credentials provider using the {@link AWSCredentialsProviderService} * @param context the process context diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java index 6609beb..9a9e0f2 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/AbstractAWSProcessor.java @@ -276,8 +276,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl @OnScheduled public void onScheduled(final ProcessContext context) { - this.client = createClient(context, getCredentials(context), createConfiguration(context)); - initializeRegionAndEndpoint(context, this.client); + setClientAndRegion(context); } /* @@ -302,10 +301,6 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl */ public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException; - protected void initializeRegionAndEndpoint(final ProcessContext context, final AmazonWebServiceClient client) { - this.region = getRegionAndInitializeEndpoint(context, client); - } - protected Region getRegionAndInitializeEndpoint(final ProcessContext context, final AmazonWebServiceClient client) { final Region region; // if the processor supports REGION, get the configured region. @@ -336,7 +331,7 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl // falling back to the configured region if the parse fails // e.g. in case of https://vpce-***-***.sqs.{region}.vpce.amazonaws.com String regionValue = parseRegionForVPCE(urlstr, region.getName()); - client.setEndpoint(urlstr, this.client.getServiceName(), regionValue); + client.setEndpoint(urlstr, client.getServiceName(), regionValue); } else { // handling non-vpce custom endpoints where the AWS library can parse the region out // e.g. https://sqs.{region}.***.***.***.gov @@ -416,4 +411,49 @@ public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceCl getClient().shutdown(); } } + + protected void setClientAndRegion(final ProcessContext context) { + final AWSConfiguration awsConfiguration = getConfiguration(context); + this.client = awsConfiguration.getClient(); + this.region = awsConfiguration.getRegion(); + } + + /** + * Creates an AWS service client from the context. + * @param context The process context + * @return The created client + */ + protected ClientType createClient(final ProcessContext context) { + return createClient(context, getCredentials(context), createConfiguration(context)); + } + + /** + * Parses and configures the client and region from the context. + * @param context The process context + * @return The parsed configuration + */ + protected AWSConfiguration getConfiguration(final ProcessContext context) { + final ClientType client = createClient(context); + final Region region = getRegionAndInitializeEndpoint(context, client); + + return new AWSConfiguration(client, region); + } + + public class AWSConfiguration { + final ClientType client; + final Region region; + + public AWSConfiguration(final ClientType client, final Region region) { + this.client = client; + this.region = region; + } + + public ClientType getClient() { + return client; + } + + public Region getRegion() { + return region; + } + } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java index 6c5d4f6..40388b0 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/AbstractDynamoDBProcessor.java @@ -199,15 +199,15 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr return client; } - protected Object getValue(ProcessContext context, PropertyDescriptor type, PropertyDescriptor value, FlowFile flowFile) { + protected Object getValue(final ProcessContext context, final PropertyDescriptor type, final PropertyDescriptor value, final Map<String, String> attributes) { if ( context.getProperty(type).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) { - return context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue(); + return context.getProperty(value).evaluateAttributeExpressions(attributes).getValue(); } else { - return new BigDecimal(context.getProperty(value).evaluateAttributeExpressions(flowFile).getValue()); + return new BigDecimal(context.getProperty(value).evaluateAttributeExpressions(attributes).getValue()); } } - protected Object getAttributeValue(ProcessContext context, PropertyDescriptor propertyType, AttributeValue value) { + protected Object getAttributeValue(final ProcessContext context, final PropertyDescriptor propertyType, final AttributeValue value) { if ( context.getProperty(propertyType).getValue().equals(ALLOWABLE_VALUE_STRING.getValue())) { if ( value == null ) return null; else return value.getS(); @@ -217,9 +217,14 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr } } + protected DynamoDB getDynamoDB(final AmazonDynamoDBClient client) { + return new DynamoDB(client); + } + protected synchronized DynamoDB getDynamoDB() { - if ( dynamoDB == null ) - dynamoDB = new DynamoDB(client); + if (dynamoDB == null) { + dynamoDB = getDynamoDB(client); + } return dynamoDB; } @@ -297,28 +302,33 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr keysToFlowFileMap.remove(itemKeys); } - protected boolean isRangeKeyValueConsistent(String rangeKeyName, Object rangeKeyValue, ProcessSession session, - FlowFile flowFile) { + protected boolean isRangeKeyValueConsistent(final String rangeKeyName, final Object rangeKeyValue, final ProcessSession session, FlowFile flowFile) { + try { + validateRangeKeyValue(rangeKeyName, rangeKeyValue); + } catch (final IllegalArgumentException e) { + getLogger().error(e.getMessage() + ": " + flowFile, e); + flowFile = session.putAttribute(flowFile, DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName + + "'/value '" + rangeKeyValue + "' inconsistency error"); + session.transfer(flowFile, REL_FAILURE); + return false; + } + + return true; + } + + protected void validateRangeKeyValue(final String rangeKeyName, final Object rangeKeyValue) { boolean isRangeNameBlank = StringUtils.isBlank(rangeKeyName); boolean isRangeValueNull = rangeKeyValue == null; boolean isConsistent = true; - if ( ! isRangeNameBlank && (isRangeValueNull || StringUtils.isBlank(rangeKeyValue.toString()))) { + if (!isRangeNameBlank && (isRangeValueNull || StringUtils.isBlank(rangeKeyValue.toString()))) { isConsistent = false; } - if ( isRangeNameBlank && ( ! isRangeValueNull && ! StringUtils.isBlank(rangeKeyValue.toString()))) { + if (isRangeNameBlank && (!isRangeValueNull && !StringUtils.isBlank(rangeKeyValue.toString()))) { isConsistent = false; } - - if ( ! isConsistent ) { - getLogger().error("Range key name '" + rangeKeyName + "' was not consistent with range value " - + rangeKeyValue + "'" + flowFile); - flowFile = session.putAttribute(flowFile, DYNAMODB_RANGE_KEY_VALUE_ERROR, "range key '" + rangeKeyName - + "'/value '" + rangeKeyValue + "' inconsistency error"); - session.transfer(flowFile, REL_FAILURE); + if (!isConsistent) { + throw new IllegalArgumentException(String.format("Range key name '%s' was not consistent with range value '%s'", rangeKeyName, rangeKeyValue)); } - - return isConsistent; - } protected boolean isHashKeyValueConsistent(String hashKeyName, Object hashKeyValue, ProcessSession session, @@ -326,10 +336,11 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr boolean isConsistent = true; - if ( hashKeyValue == null || StringUtils.isBlank(hashKeyValue.toString())) { - getLogger().error("Hash key value '" + hashKeyValue + "' is required for flow file " + flowFile); - flowFile = session.putAttribute(flowFile, DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName - + "/value '" + hashKeyValue + "' inconsistency error"); + try { + validateHashKeyValue(hashKeyValue); + } catch (final IllegalArgumentException e) { + getLogger().error(e.getMessage() + ": " + flowFile, e); + flowFile = session.putAttribute(flowFile, DYNAMODB_HASH_KEY_VALUE_ERROR, "hash key " + hashKeyName + "/value '" + hashKeyValue + "' inconsistency error"); session.transfer(flowFile, REL_FAILURE); isConsistent = false; } @@ -338,6 +349,12 @@ public abstract class AbstractDynamoDBProcessor extends AbstractAWSCredentialsPr } + protected void validateHashKeyValue(final Object hashKeyValue) { + if (hashKeyValue == null || StringUtils.isBlank(hashKeyValue.toString())) { + throw new IllegalArgumentException(String.format("Hash key value is required. Provided value was '%s'", hashKeyValue)); + } + } + @OnStopped public void onStopped() { this.dynamoDB = null; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java index 0c310db..7cde727 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/wag/AbstractAWSGatewayApiProcessor.java @@ -97,7 +97,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends public AbstractAWSGatewayApiProcessor() { } - public AbstractAWSGatewayApiProcessor(AmazonHttpClient client) { + public AbstractAWSGatewayApiProcessor(final AmazonHttpClient client) { providedClient = client; } @@ -280,8 +280,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends .build(); @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor( - String propertyDescriptorName) { + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() .required(false) .name(propertyDescriptorName) @@ -318,14 +317,13 @@ public abstract class AbstractAWSGatewayApiProcessor extends } @Override - protected Collection<ValidationResult> customValidate( - final ValidationContext validationContext) { + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { final List<ValidationResult> results = new ArrayList<>(3); results.addAll(super.customValidate(validationContext)); final boolean querySet = validationContext.getProperty(PROP_QUERY_PARAMS).isSet(); if (querySet) { - String input = validationContext.getProperty(PROP_QUERY_PARAMS).getValue(); + final String input = validationContext.getProperty(PROP_QUERY_PARAMS).getValue(); // if we have expressions, we don't do further validation if (!(validationContext.isExpressionLanguageSupported(PROP_QUERY_PARAMS.getName()) && validationContext.isExpressionLanguagePresent(input))) { @@ -350,15 +348,15 @@ public abstract class AbstractAWSGatewayApiProcessor extends } } } - String method = trimToEmpty(validationContext.getProperty(PROP_METHOD).getValue()) + final String method = trimToEmpty(validationContext.getProperty(PROP_METHOD).getValue()) .toUpperCase(); // if there are expressions do not validate if (!(validationContext.isExpressionLanguageSupported(PROP_METHOD.getName()) && validationContext.isExpressionLanguagePresent(method))) { try { - HttpMethodName methodName = HttpMethodName.fromValue(method); - } catch (IllegalArgumentException e) { + HttpMethodName.fromValue(method); + } catch (final IllegalArgumentException e) { results.add(new ValidationResult.Builder().subject(PROP_METHOD.getName()).input(method) .explanation("Unsupported METHOD") .valid(false).build()); @@ -369,9 +367,9 @@ public abstract class AbstractAWSGatewayApiProcessor extends } @Override - protected GenericApiGatewayClient createClient(ProcessContext context, - AWSCredentialsProvider awsCredentialsProvider, - ClientConfiguration clientConfiguration) { + protected GenericApiGatewayClient createClient(final ProcessContext context, + final AWSCredentialsProvider awsCredentialsProvider, + final ClientConfiguration clientConfiguration) { GenericApiGatewayClientBuilder builder = new GenericApiGatewayClientBuilder() .withCredentials(awsCredentialsProvider).withClientConfiguration(clientConfiguration) @@ -392,33 +390,34 @@ public abstract class AbstractAWSGatewayApiProcessor extends protected GenericApiGatewayClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration clientConfiguration) { - return createClient(context, new AWSStaticCredentialsProvider(credentials), - clientConfiguration); + return createClient(context, new AWSStaticCredentialsProvider(credentials), clientConfiguration); } protected GenericApiGatewayRequest configureRequest(final ProcessContext context, final ProcessSession session, final String resourcePath, - final FlowFile requestFlowFile) { - String method = trimToEmpty( + final FlowFile requestFlowFile, + final Map<String, String> attributes) { + final String method = trimToEmpty( context.getProperty(PROP_METHOD).evaluateAttributeExpressions(requestFlowFile) .getValue()).toUpperCase(); - HttpMethodName methodName = HttpMethodName.fromValue(method); - return configureRequest(context, session, resourcePath,requestFlowFile, methodName); + final HttpMethodName methodName = HttpMethodName.fromValue(method); + return configureRequest(context, session, resourcePath,requestFlowFile, methodName, attributes); } protected GenericApiGatewayRequest configureRequest(final ProcessContext context, final ProcessSession session, final String resourcePath, final FlowFile requestFlowFile, - final HttpMethodName methodName) { + final HttpMethodName methodName, + final Map<String, String> attributes) { GenericApiGatewayRequestBuilder builder = new GenericApiGatewayRequestBuilder() .withResourcePath(resourcePath); final Map<String, List<String>> parameters = getParameters(context); builder = builder.withParameters(parameters); - InputStream requestBody = null; + InputStream requestBody; switch (methodName) { case GET: builder = builder.withHttpMethod(HttpMethodName.GET); @@ -447,7 +446,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends break; } - builder = setHeaderProperties(context, builder, methodName, requestFlowFile); + builder = setHeaderProperties(context, builder, methodName, attributes); return builder.build(); } @@ -456,7 +455,7 @@ public abstract class AbstractAWSGatewayApiProcessor extends final FlowFile requestFlowFile) { if (context.getProperty(PROP_SEND_BODY).asBoolean() && requestFlowFile != null) { - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); session.exportTo(requestFlowFile, outputStream); return new ByteArrayInputStream(outputStream.toByteArray()); @@ -467,24 +466,22 @@ public abstract class AbstractAWSGatewayApiProcessor extends protected GenericApiGatewayRequestBuilder setHeaderProperties(final ProcessContext context, GenericApiGatewayRequestBuilder requestBuilder, - HttpMethodName methodName, - final FlowFile requestFlowFile) { + final HttpMethodName methodName, + final Map<String, String> requestAttributes) { - Map<String, String> headers = new HashMap<>(); - for (String headerKey : dynamicPropertyNames) { - String headerValue = context.getProperty(headerKey) - .evaluateAttributeExpressions(requestFlowFile).getValue(); + final Map<String, String> headers = new HashMap<>(); + for (final String headerKey : dynamicPropertyNames) { + final String headerValue = context.getProperty(headerKey).evaluateAttributeExpressions(requestAttributes).getValue(); headers.put(headerKey, headerValue); } // iterate through the flowfile attributes, adding any attribute that // matches the attributes-to-send pattern. if the pattern is not set // (it's an optional property), ignore that attribute entirely - if (regexAttributesToSend != null && requestFlowFile != null) { - Map<String, String> attributes = requestFlowFile.getAttributes(); - Matcher m = regexAttributesToSend.matcher(""); - for (Map.Entry<String, String> entry : attributes.entrySet()) { - String headerKey = trimToEmpty(entry.getKey()); + if (regexAttributesToSend != null) { + final Matcher m = regexAttributesToSend.matcher(""); + for (final Map.Entry<String, String> entry : requestAttributes.entrySet()) { + final String headerKey = trimToEmpty(entry.getKey()); // don't include any of the ignored attributes if (IGNORED_ATTRIBUTES.contains(headerKey)) { @@ -502,8 +499,8 @@ public abstract class AbstractAWSGatewayApiProcessor extends } String contentType = context.getProperty(PROP_CONTENT_TYPE) - .evaluateAttributeExpressions(requestFlowFile).getValue(); - boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean(); + .evaluateAttributeExpressions(requestAttributes).getValue(); + final boolean sendBody = context.getProperty(PROP_SEND_BODY).asBoolean(); contentType = StringUtils.isBlank(contentType) ? DEFAULT_CONTENT_TYPE : contentType; if (methodName == HttpMethodName.PUT || methodName == HttpMethodName.POST || methodName == HttpMethodName.PATCH) { @@ -527,23 +524,23 @@ public abstract class AbstractAWSGatewayApiProcessor extends * @param context ProcessContext * @return Map of names and values */ - protected Map<String, List<String>> getParameters(ProcessContext context) { + protected Map<String, List<String>> getParameters(final ProcessContext context) { if (!context.getProperty(PROP_QUERY_PARAMS).isSet()) { return new HashMap<>(); } final String queryString = context.getProperty(PROP_QUERY_PARAMS) .evaluateAttributeExpressions().getValue(); - List<NameValuePair> params = URLEncodedUtils + final List<NameValuePair> params = URLEncodedUtils .parse(queryString, Charsets.toCharset("UTF-8")); if (params.isEmpty()) { return new HashMap<>(); } - Map<String, List<String>> map = new HashMap<>(); + final Map<String, List<String>> map = new HashMap<>(); - for (NameValuePair nvp : params) { + for (final NameValuePair nvp : params) { if (!map.containsKey(nvp.getName())) { map.put(nvp.getName(), new ArrayList<>()); } @@ -555,19 +552,17 @@ public abstract class AbstractAWSGatewayApiProcessor extends /** * Returns a Map of flowfile attributes from the response http headers. Multivalue headers are naively converted to comma separated strings. */ - protected Map<String, String> convertAttributesFromHeaders( - GenericApiGatewayResponse responseHttp) { + protected Map<String, String> convertAttributesFromHeaders(final GenericApiGatewayResponse responseHttp) { // create a new hashmap to store the values from the connection - Map<String, String> map = new HashMap<>(); + final Map<String, String> map = new HashMap<>(); responseHttp.getHttpResponse().getHeaders().entrySet().forEach((entry) -> { - String key = entry.getKey(); - String value = entry.getValue(); + final String key = entry.getKey(); + final String value = entry.getValue(); if (key == null) { return; } - // we ignore any headers with no actual values (rare) if (StringUtils.isBlank(value)) { return; @@ -590,8 +585,8 @@ public abstract class AbstractAWSGatewayApiProcessor extends throw new IllegalStateException("Unknown relationship " + name); } - protected void route(FlowFile request, FlowFile response, ProcessSession session, - ProcessContext context, int statusCode, Set<Relationship> relationships) { + protected void route(FlowFile request, final FlowFile response, final ProcessSession session, + final ProcessContext context, final int statusCode, final Set<Relationship> relationships) { // check if we should yield the processor if (!isSuccess(statusCode) && request == null) { context.yield(); @@ -609,12 +604,10 @@ public abstract class AbstractAWSGatewayApiProcessor extends if (isSuccess(statusCode)) { // we have two flowfiles to transfer if (request != null) { - session - .transfer(request, getRelationshipForName(REL_SUCCESS_REQ_NAME, relationships)); + session.transfer(request, getRelationshipForName(REL_SUCCESS_REQ_NAME, relationships)); } if (response != null && !responseSent) { - session - .transfer(response, getRelationshipForName(REL_RESPONSE_NAME, relationships)); + session.transfer(response, getRelationshipForName(REL_RESPONSE_NAME, relationships)); } // 5xx -> RETRY @@ -636,20 +629,20 @@ public abstract class AbstractAWSGatewayApiProcessor extends } - protected boolean isSuccess(int statusCode) { + protected boolean isSuccess(final int statusCode) { return statusCode / 100 == 2; } - protected void logRequest(ComponentLog logger, URI endpoint, GenericApiGatewayRequest request) { + protected void logRequest(final ComponentLog logger, final URI endpoint, final GenericApiGatewayRequest request) { try { logger.debug("\nRequest to remote service:\n\t{}\t{}\t\n{}", new Object[]{endpoint.toURL().toExternalForm(), request.getHttpMethod(), getLogString(request.getHeaders())}); - } catch (MalformedURLException e) { + } catch (final MalformedURLException e) { logger.debug(e.getMessage()); } } - protected void logResponse(ComponentLog logger, GenericApiGatewayResponse response) { + protected void logResponse(final ComponentLog logger, final GenericApiGatewayResponse response) { try { logger.debug("\nResponse from remote service:\n\t{}\n{}", new Object[]{response.getHttpResponse().getHttpRequest().getURI().toURL().toExternalForm(), getLogString(response.getHttpResponse().getHeaders())}); @@ -658,9 +651,9 @@ public abstract class AbstractAWSGatewayApiProcessor extends } } - protected String getLogString(Map<String, String> map) { - StringBuilder sb = new StringBuilder(); - if(map != null && map.size() > 0) { + protected String getLogString(final Map<String, String> map) { + final StringBuilder sb = new StringBuilder(); + if (map != null && map.size() > 0) { for (Map.Entry<String, String> entry : map.entrySet()) { String value = entry.getValue(); sb.append("\t"); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java index 3c9f73b..34f712d 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/DeleteDynamoDB.java @@ -101,8 +101,8 @@ public class DeleteDynamoDB extends AbstractWriteDynamoDBProcessor { TableWriteItems tableWriteItems = new TableWriteItems(table); for (FlowFile flowFile : flowFiles) { - final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile); - final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile); + final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes()); + final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes()); if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) { continue; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java index 328dae6..14626ca 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDB.java @@ -16,40 +16,44 @@ */ package org.apache.nifi.processors.aws.dynamodb; -import java.io.ByteArrayInputStream; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.ReadsAttribute; import org.apache.nifi.annotation.behavior.ReadsAttributes; -import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import com.amazonaws.AmazonClientException; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome; -import com.amazonaws.services.dynamodbv2.document.DynamoDB; -import com.amazonaws.services.dynamodbv2.document.Item; -import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes; -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes; +import java.io.ByteArrayInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; @SupportsBatching @SeeAlso({DeleteDynamoDB.class, PutDynamoDB.class}) @@ -100,42 +104,97 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor { } @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) { - List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()); - if (flowFiles == null || flowFiles.size() == 0) { - return; + public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) { + final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, verificationLogger, attributes)); + + final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue(); + final String jsonDocument = context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue(); + + TableKeysAndAttributes tableKeysAndAttributes; + + try { + tableKeysAndAttributes = getTableKeysAndAttributes(context, attributes); + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.SUCCESSFUL) + .verificationStepName("Configure DynamoDB BatchGetItems Request") + .explanation(String.format("Successfully configured BatchGetItems Request")) + .build()); + } catch (final IllegalArgumentException e) { + verificationLogger.error("Failed to configured BatchGetItems Request", e); + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.FAILED) + .verificationStepName("Configure DynamoDB BatchGetItems Request") + .explanation(String.format("Failed to configured BatchGetItems Request: " + e.getMessage())) + .build()); + return results; } - Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>(); + if (tableKeysAndAttributes.getPrimaryKeys() == null || tableKeysAndAttributes.getPrimaryKeys().isEmpty()) { + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.SKIPPED) + .verificationStepName("Get DynamoDB Items") + .explanation(String.format("Skipped getting DynamoDB items because no primary keys would be included in retrieval")) + .build()); + } else { + try { + final DynamoDB dynamoDB = getDynamoDB(getConfiguration(context).getClient()); + int totalCount = 0; + int jsonDocumentCount = 0; - final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue(); - TableKeysAndAttributes tableKeysAndAttributes = new TableKeysAndAttributes(table); + BatchGetItemOutcome result = dynamoDB.batchGetItem(tableKeysAndAttributes); - final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue(); - final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue(); - final String jsonDocument = context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue(); + // Handle processed items and get the json document + final List<Item> items = result.getTableItems().get(table); + for (final Item item : items) { + totalCount++; + if (item.get(jsonDocument) != null) { + jsonDocumentCount++; + } + } - for (FlowFile flowFile : flowFiles) { - final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile); - final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile); + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.SUCCESSFUL) + .verificationStepName("Get DynamoDB Items") + .explanation(String.format("Successfully retrieved %s items, including %s JSON documents, from DynamoDB", totalCount, jsonDocumentCount)) + .build()); - if ( ! isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) { - continue; - } + } catch (final Exception e) { + verificationLogger.error("Failed to retrieve items from DynamoDB", e); - if ( ! isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile) ) { - continue; + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.FAILED) + .verificationStepName("Get DynamoDB Items") + .explanation(String.format("Failed to retrieve items from DynamoDB: %s", e.getMessage())) + .build()); } + } - keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile); + return results; + } - if ( rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString()) ) { - tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName, hashKeyValue); - } else { - tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName, hashKeyValue, rangeKeyName, rangeKeyValue); - } + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + final List<FlowFile> flowFiles = session.get(context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger()); + if (flowFiles == null || flowFiles.size() == 0) { + return; + } + + final Map<ItemKeys,FlowFile> keysToFlowFileMap = getKeysToFlowFileMap(context, session, flowFiles); + + final TableKeysAndAttributes tableKeysAndAttributes; + try { + tableKeysAndAttributes = getTableKeysAndAttributes(context, flowFiles.stream() + .map(FlowFile::getAttributes).collect(Collectors.toList()).toArray(new Map[0])); + } catch (final IllegalArgumentException e) { + getLogger().error(e.getMessage(), e); + return; } + final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue(); + final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue(); + final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue(); + final String jsonDocument = context.getProperty(JSON_DOCUMENT).evaluateAttributeExpressions().getValue(); + if (keysToFlowFileMap.isEmpty()) { return; } @@ -146,12 +205,12 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor { BatchGetItemOutcome result = dynamoDB.batchGetItem(tableKeysAndAttributes); // Handle processed items and get the json document - List<Item> items = result.getTableItems().get(table); - for (Item item : items) { - ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), item.get(rangeKeyName)); + final List<Item> items = result.getTableItems().get(table); + for (final Item item : items) { + final ItemKeys itemKeys = new ItemKeys(item.get(hashKeyName), item.get(rangeKeyName)); FlowFile flowFile = keysToFlowFileMap.get(itemKeys); - if ( item.get(jsonDocument) != null ) { + if (item.get(jsonDocument) != null) { ByteArrayInputStream bais = new ByteArrayInputStream(item.getJSON(jsonDocument).getBytes()); flowFile = session.importFrom(bais, flowFile); } @@ -161,38 +220,85 @@ public class GetDynamoDB extends AbstractDynamoDBProcessor { } // Handle unprocessed keys - Map<String, KeysAndAttributes> unprocessedKeys = result.getUnprocessedKeys(); + final Map<String, KeysAndAttributes> unprocessedKeys = result.getUnprocessedKeys(); if ( unprocessedKeys != null && unprocessedKeys.size() > 0) { - KeysAndAttributes keysAndAttributes = unprocessedKeys.get(table); - List<Map<String, AttributeValue>> keys = keysAndAttributes.getKeys(); + final KeysAndAttributes keysAndAttributes = unprocessedKeys.get(table); + final List<Map<String, AttributeValue>> keys = keysAndAttributes.getKeys(); - for (Map<String,AttributeValue> unprocessedKey : keys) { - Object hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName)); - Object rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName)); + for (final Map<String,AttributeValue> unprocessedKey : keys) { + final Object hashKeyValue = getAttributeValue(context, HASH_KEY_VALUE_TYPE, unprocessedKey.get(hashKeyName)); + final Object rangeKeyValue = getAttributeValue(context, RANGE_KEY_VALUE_TYPE, unprocessedKey.get(rangeKeyName)); sendUnprocessedToUnprocessedRelationship(session, keysToFlowFileMap, hashKeyValue, rangeKeyValue); } } // Handle any remaining items - for (ItemKeys key : keysToFlowFileMap.keySet()) { + for (final ItemKeys key : keysToFlowFileMap.keySet()) { FlowFile flowFile = keysToFlowFileMap.get(key); flowFile = session.putAttribute(flowFile, DYNAMODB_KEY_ERROR_NOT_FOUND, DYNAMODB_KEY_ERROR_NOT_FOUND_MESSAGE + key.toString() ); session.transfer(flowFile,REL_NOT_FOUND); keysToFlowFileMap.remove(key); } - } catch(AmazonServiceException exception) { + } catch(final AmazonServiceException exception) { getLogger().error("Could not process flowFiles due to service exception : " + exception.getMessage()); List<FlowFile> failedFlowFiles = processServiceException(session, flowFiles, exception); session.transfer(failedFlowFiles, REL_FAILURE); - } catch(AmazonClientException exception) { + } catch(final AmazonClientException exception) { getLogger().error("Could not process flowFiles due to client exception : " + exception.getMessage()); List<FlowFile> failedFlowFiles = processClientException(session, flowFiles, exception); session.transfer(failedFlowFiles, REL_FAILURE); - } catch(Exception exception) { + } catch(final Exception exception) { getLogger().error("Could not process flowFiles due to exception : " + exception.getMessage()); List<FlowFile> failedFlowFiles = processException(session, flowFiles, exception); session.transfer(failedFlowFiles, REL_FAILURE); } } + + private Map<ItemKeys, FlowFile> getKeysToFlowFileMap(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles) { + final Map<ItemKeys,FlowFile> keysToFlowFileMap = new HashMap<>(); + + final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue(); + final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue(); + + for (final FlowFile flowFile : flowFiles) { + final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes()); + final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes()); + + if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) { + continue; + } + + if (!isRangeKeyValueConsistent(rangeKeyName, rangeKeyValue, session, flowFile)) { + continue; + } + + keysToFlowFileMap.put(new ItemKeys(hashKeyValue, rangeKeyValue), flowFile); + } + return keysToFlowFileMap; + } + + private TableKeysAndAttributes getTableKeysAndAttributes(final ProcessContext context, final Map<String, String>... attributes) { + final String table = context.getProperty(TABLE).evaluateAttributeExpressions().getValue(); + final TableKeysAndAttributes tableKeysAndAttributes = new TableKeysAndAttributes(table); + + final String hashKeyName = context.getProperty(HASH_KEY_NAME).evaluateAttributeExpressions().getValue(); + final String rangeKeyName = context.getProperty(RANGE_KEY_NAME).evaluateAttributeExpressions().getValue(); + + for (final Map<String, String> attributeMap : attributes) { + final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, attributeMap); + final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, attributeMap); + + validateHashKeyValue(hashKeyValue); + validateRangeKeyValue(rangeKeyName, rangeKeyValue); + + if (rangeKeyValue == null || StringUtils.isBlank(rangeKeyValue.toString())) { + tableKeysAndAttributes.addHashOnlyPrimaryKey(hashKeyName, hashKeyValue); + } else { + tableKeysAndAttributes.addHashAndRangePrimaryKey(hashKeyName, hashKeyValue, rangeKeyName, rangeKeyValue); + } + } + return tableKeysAndAttributes; + } + } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java index 92e552a..23b8bab 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/dynamodb/PutDynamoDB.java @@ -118,8 +118,8 @@ public class PutDynamoDB extends AbstractWriteDynamoDBProcessor { TableWriteItems tableWriteItems = new TableWriteItems(table); for (FlowFile flowFile : flowFiles) { - final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile); - final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile); + final Object hashKeyValue = getValue(context, HASH_KEY_VALUE_TYPE, HASH_KEY_VALUE, flowFile.getAttributes()); + final Object rangeKeyValue = getValue(context, RANGE_KEY_VALUE_TYPE, RANGE_KEY_VALUE, flowFile.getAttributes()); if (!isHashKeyValueConsistent(hashKeyName, hashKeyValue, session, flowFile)) { continue; diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java index 719f1cf..b9de5fe 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java @@ -16,16 +16,12 @@ */ package org.apache.nifi.processors.aws.s3; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; - +import com.amazonaws.AmazonClientException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.SSEAlgorithm; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -37,23 +33,30 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.SeeAlso; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.util.StandardValidators; -import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3Object; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; @SupportsBatching @SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class}) @@ -149,6 +152,36 @@ public class FetchS3Object extends AbstractS3Processor { } @Override + public List<ConfigVerificationResult> verify(ProcessContext context, ComponentLog verificationLogger, Map<String, String> attributes) { + final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, verificationLogger, attributes)); + + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue(); + + final AmazonS3 client = getConfiguration(context).getClient(); + final GetObjectMetadataRequest request = createGetObjectMetadataRequest(context, attributes); + + try { + final ObjectMetadata objectMetadata = client.getObjectMetadata(request); + final long byteCount = objectMetadata.getContentLength(); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("HEAD S3 Object") + .outcome(Outcome.SUCCESSFUL) + .explanation(String.format("Successfully performed HEAD on [%s] (%s bytes) from Bucket [%s]", key, byteCount, bucket)) + .build()); + } catch (final Exception e) { + getLogger().error(String.format("Failed to fetch [%s] from Bucket [%s]", key, bucket), e); + results.add(new ConfigVerificationResult.Builder() + .verificationStepName("HEAD S3 Object") + .outcome(Outcome.FAILED) + .explanation(String.format("Failed to perform HEAD on [%s] from Bucket [%s]: %s", key, bucket, e.getMessage())) + .build()); + } + + return results; + } + + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) { FlowFile flowFile = session.get(); if (flowFile == null) { @@ -156,48 +189,18 @@ public class FetchS3Object extends AbstractS3Processor { } final long startNanos = System.nanoTime(); - final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); - final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); - final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(flowFile).getValue(); - final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean(); - final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : 0L); - final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B).longValue() : null); - - final AmazonS3 client = getClient(); - final GetObjectRequest request; - if (versionId == null) { - request = new GetObjectRequest(bucket, key); - } else { - request = new GetObjectRequest(bucket, key, versionId); - } - request.setRequesterPays(requesterPays); - - // tl;dr don't setRange(0) on GetObjectRequest because it results in - // InvalidRange errors on zero byte objects. - // - // Amazon S3 sets byte ranges using HTTP Range headers as described in - // https://datatracker.ietf.org/doc/html/rfc2616#section-14.35 and - // https://datatracker.ietf.org/doc/html/rfc7233#section-2.1. There - // isn't a satisfiable byte range specification for zero length objects - // so 416 (Request range not satisfiable) is returned. - // - // Since the effect of the byte range 0- is equivalent to not sending a - // byte range and works for both zero and non-zero length objects, - // the single argument setRange() only needs to be called when the - // first byte position is greater than zero. - if (rangeLength != null) { - request.setRange(rangeStart, rangeStart + rangeLength - 1); - } else if (rangeStart > 0) { - request.setRange(rangeStart); - } final Map<String, String> attributes = new HashMap<>(); - AmazonS3EncryptionService encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class); + final AmazonS3EncryptionService encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class); if (encryptionService != null) { - encryptionService.configureGetObjectRequest(request, new ObjectMetadata()); attributes.put("s3.encryptionStrategy", encryptionService.getStrategyName()); } + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(flowFile).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); + + final AmazonS3 client = getClient(); + final GetObjectRequest request = createGetObjectRequest(context, flowFile.getAttributes()); try (final S3Object s3Object = client.getObject(request)) { if (s3Object == null) { @@ -272,6 +275,64 @@ public class FetchS3Object extends AbstractS3Processor { session.getProvenanceReporter().fetch(flowFile, "http://" + bucket + ".amazonaws.com/" + key, transferMillis); } + private GetObjectMetadataRequest createGetObjectMetadataRequest(final ProcessContext context, final Map<String, String> attributes) { + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue(); + final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(attributes).getValue(); + final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean(); + + final GetObjectMetadataRequest request; + if (versionId == null) { + request = new GetObjectMetadataRequest(bucket, key); + } else { + request = new GetObjectMetadataRequest(bucket, key, versionId); + } + request.setRequesterPays(requesterPays); + return request; + } + + private GetObjectRequest createGetObjectRequest(final ProcessContext context, final Map<String, String> attributes) { + final String bucket = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue(); + final String key = context.getProperty(KEY).evaluateAttributeExpressions(attributes).getValue(); + final String versionId = context.getProperty(VERSION_ID).evaluateAttributeExpressions(attributes).getValue(); + final boolean requesterPays = context.getProperty(REQUESTER_PAYS).asBoolean(); + final long rangeStart = (context.getProperty(RANGE_START).isSet() ? context.getProperty(RANGE_START).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue() : 0L); + final Long rangeLength = (context.getProperty(RANGE_LENGTH).isSet() ? context.getProperty(RANGE_LENGTH).evaluateAttributeExpressions(attributes).asDataSize(DataUnit.B).longValue() : null); + + final GetObjectRequest request; + if (versionId == null) { + request = new GetObjectRequest(bucket, key); + } else { + request = new GetObjectRequest(bucket, key, versionId); + } + request.setRequesterPays(requesterPays); + + // tl;dr don't setRange(0) on GetObjectRequest because it results in + // InvalidRange errors on zero byte objects. + // + // Amazon S3 sets byte ranges using HTTP Range headers as described in + // https://datatracker.ietf.org/doc/html/rfc2616#section-14.35 and + // https://datatracker.ietf.org/doc/html/rfc7233#section-2.1. There + // isn't a satisfiable byte range specification for zero length objects + // so 416 (Request range not satisfiable) is returned. + // + // Since the effect of the byte range 0- is equivalent to not sending a + // byte range and works for both zero and non-zero length objects, + // the single argument setRange() only needs to be called when the + // first byte position is greater than zero. + if (rangeLength != null) { + request.setRange(rangeStart, rangeStart + rangeLength - 1); + } else if (rangeStart > 0) { + request.setRange(rangeStart); + } + + final AmazonS3EncryptionService encryptionService = context.getProperty(ENCRYPTION_SERVICE).asControllerService(AmazonS3EncryptionService.class); + if (encryptionService != null) { + encryptionService.configureGetObjectRequest(request, new ObjectMetadata()); + } + return request; + } + protected void setFilePathAttributes(Map<String, String> attributes, String filePathName) { final int lastSlash = filePathName.lastIndexOf("/"); if (lastSlash > -1 && lastSlash < filePathName.length() - 1) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java index 5446650..5ee1a83 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/ListS3.java @@ -53,7 +53,6 @@ import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; import org.apache.nifi.components.state.Scope; import org.apache.nifi.components.state.StateMap; -import org.apache.nifi.controller.ControllerService; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -901,14 +900,10 @@ public class ListS3 extends AbstractS3Processor implements VerifiableProcessor { @Override public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog logger, final Map<String, String> attributes) { - final ControllerService service = context.getProperty(AWS_CREDENTIALS_PROVIDER_SERVICE).asControllerService(); - final AmazonS3Client client = service != null ? createClient(context, getCredentialsProvider(context), createConfiguration(context)) - : createClient(context, getCredentials(context), createConfiguration(context)); + final AmazonS3Client client = getConfiguration(context).getClient(); - getRegionAndInitializeEndpoint(context, client); - - final List<ConfigVerificationResult> results = new ArrayList<>(); - final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions().getValue(); + final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, logger, attributes)); + final String bucketName = context.getProperty(BUCKET).evaluateAttributeExpressions(attributes).getValue(); final long minAgeMilliseconds = context.getProperty(MIN_AGE).asTimePeriod(TimeUnit.MILLISECONDS); if (bucketName == null || bucketName.trim().isEmpty()) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java index d138dcc..8c6fc77 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/wag/InvokeAWSGatewayApi.java @@ -26,6 +26,8 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; @@ -41,6 +43,7 @@ import org.apache.nifi.processors.aws.wag.client.GenericApiGatewayResponse; import org.apache.nifi.stream.io.StreamUtils; import java.io.ByteArrayInputStream; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -70,6 +73,8 @@ import java.util.concurrent.TimeUnit; + "of the Dynamic Property.") public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor { + private static final Set<String> IDEMPOTENT_METHODS = new HashSet<>(Arrays.asList("GET", "HEAD", "OPTIONS")); + public static final List<PropertyDescriptor> properties = Collections.unmodifiableList(Arrays .asList( PROP_METHOD, @@ -151,8 +156,8 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor { } @Override - public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { - ComponentLog logger = getLogger(); + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final ComponentLog logger = getLogger(); FlowFile requestFlowFile = session.get(); // Checking to see if the property to put the body of the response in an attribute was set @@ -179,41 +184,20 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor { final GenericApiGatewayClient client = getClient(); - final GenericApiGatewayRequest request = configureRequest(context, session, - resourceName, - requestFlowFile); - - logRequest(logger, client.getEndpoint(), request); final long startNanos = System.nanoTime(); - GenericApiGatewayResponse response = null; - GenericApiGatewayException exception = null; - try { - response = client.execute(request); - logResponse(logger, response); - } catch (GenericApiGatewayException gag) { - // ERROR response codes may come back as exceptions, 404 for example - exception = gag; - } + final Map<String, String> attributes = requestFlowFile == null ? Collections.emptyMap() : requestFlowFile.getAttributes(); + final GatewayResponse gatewayResponse = invokeGateway(client, context, session, requestFlowFile, attributes, logger); - final int statusCode; - if (exception != null) { - statusCode = exception.getStatusCode(); - } else { - statusCode = response.getHttpResponse().getStatusCode(); - } + final GenericApiGatewayResponse response = gatewayResponse.response; + final GenericApiGatewayException exception = gatewayResponse.exception; + final int statusCode = gatewayResponse.statusCode; - if (statusCode == 0) { - throw new IllegalStateException( - "Status code unknown, connection hasn't been attempted."); - } final String endpoint = context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue(); - boolean outputRegardless = context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS) + final boolean outputRegardless = context.getProperty(PROP_OUTPUT_RESPONSE_REGARDLESS) .asBoolean(); - boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute - || outputRegardless); - boolean outputBodyToRequestAttribute = - (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null; + boolean outputBodyToResponseContent = (isSuccess(statusCode) && !putToAttribute || outputRegardless); + boolean outputBodyToRequestAttribute = (!isSuccess(statusCode) || putToAttribute) && requestFlowFile != null; boolean bodyExists = response != null && response.getBody() != null; final String statusExplanation; @@ -246,9 +230,7 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor { if (context.getProperty(PROP_ADD_HEADERS_TO_REQUEST).asBoolean()) { // write the response headers as attributes // this will overwrite any existing flowfile attributes - requestFlowFile = session.putAllAttributes(requestFlowFile, - convertAttributesFromHeaders( - response)); + requestFlowFile = session.putAllAttributes(requestFlowFile, convertAttributesFromHeaders(response)); } } else { responseFlowFile = session.create(); @@ -282,8 +264,7 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor { responseFlowFile); // emit provenance event - final long millis = TimeUnit.NANOSECONDS - .toMillis(System.nanoTime() - startNanos); + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); if (requestFlowFile != null) { session.getProvenanceReporter().fetch(responseFlowFile, endpoint, millis); } else { @@ -317,7 +298,7 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor { if (attributeKey == null) { attributeKey = RESPONSE_BODY; } - byte[] outputBuffer; + final byte[] outputBuffer; int size = 0; outputBuffer = new byte[maxAttributeSize]; if (bodyExists) { @@ -340,17 +321,13 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor { requestFlowFile = session.putAllAttributes(requestFlowFile, statusAttributes); final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); - session.getProvenanceReporter().modifyAttributes(requestFlowFile, - "The " + attributeKey - + " has been added. The value of which is the body of a http call to " - + endpoint + resourceName - + ". It took " + millis - + "millis,"); + session.getProvenanceReporter().modifyAttributes(requestFlowFile, String + .format("The %s has been added. The value of which is the body of a http call to %s%s. It took %s millis,", attributeKey, endpoint, resourceName, millis)); } route(requestFlowFile, responseFlowFile, session, context, statusCode, getRelationships()); - } catch (Exception e) { + } catch (final Exception e) { // penalize or yield if (requestFlowFile != null) { logger.error("Routing to {} due to exception: {}", @@ -364,8 +341,7 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor { session.transfer(requestFlowFile, getRelationshipForName(REL_FAILURE_NAME, getRelationships())); } else { - logger.error( - "Yielding processor due to exception encountered as a source processor: {}", e); + logger.error("Yielding processor due to exception encountered as a source processor: {}", e); context.yield(); } @@ -380,4 +356,92 @@ public class InvokeAWSGatewayApi extends AbstractAWSGatewayApiProcessor { } } } + + @Override + public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) { + final List<ConfigVerificationResult> results = new ArrayList<>(super.verify(context, verificationLogger, attributes)); + + final String method = context.getProperty(PROP_METHOD).getValue(); + + if (!IDEMPOTENT_METHODS.contains(method)) { + return results; + } + + final String endpoint = context.getProperty(PROP_AWS_GATEWAY_API_ENDPOINT).getValue(); + final String resource = context.getProperty(PROP_RESOURCE_NAME).getValue(); + try { + final GenericApiGatewayClient client = getConfiguration(context).getClient(); + + final GatewayResponse gatewayResponse = invokeGateway(client, context, null, null, attributes, verificationLogger); + + final String explanation; + if (gatewayResponse.exception != null) { + final String statusExplanation = EnglishReasonPhraseCatalog.INSTANCE.getReason(gatewayResponse.statusCode, null); + explanation = String.format("Successfully invoked AWS Gateway API [%s %s/%s] with blank request body, receiving error response [%s] with status code [%s]", + method, endpoint, resource, statusExplanation, gatewayResponse.statusCode); + } else { + final String statusExplanation = gatewayResponse.response.getHttpResponse().getStatusText(); + explanation = String.format("Successfully invoked AWS Gateway API [%s %s%/s] with blank request body, receiving success response [%s] with status code [%s]", + method, endpoint, resource, statusExplanation, gatewayResponse.statusCode); + } + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.SUCCESSFUL) + .verificationStepName("Invoke AWS Gateway API") + .explanation(explanation) + .build()); + + } catch (final Exception e) { + verificationLogger.error("Failed to invoke AWS Gateway API " + endpoint, e); + results.add(new ConfigVerificationResult.Builder() + .outcome(Outcome.FAILED) + .verificationStepName("Invoke AWS Gateway API") + .explanation(String.format("Failed to invoke AWS Gateway API [%s %s/%s]: %s", method, endpoint, resource, e.getMessage())) + .build()); + } + + return results; + } + + private GatewayResponse invokeGateway(final GenericApiGatewayClient client, final ProcessContext context, final ProcessSession session, + final FlowFile requestFlowFile, final Map<String, String> attributes, final ComponentLog logger) { + final String resourceName = context.getProperty(PROP_RESOURCE_NAME).getValue(); + + final GenericApiGatewayRequest request = configureRequest(context, session, resourceName, requestFlowFile, attributes); + + logRequest(logger, client.getEndpoint(), request); + GenericApiGatewayResponse response = null; + GenericApiGatewayException exception = null; + try { + response = client.execute(request); + logResponse(logger, response); + } catch (final GenericApiGatewayException gag) { + // ERROR response codes may come back as exceptions, 404 for example + exception = gag; + } + + final int statusCode; + if (exception != null) { + statusCode = exception.getStatusCode(); + } else { + statusCode = response.getHttpResponse().getStatusCode(); + } + + if (statusCode == 0) { + throw new IllegalStateException( + "Status code unknown, connection hasn't been attempted."); + } + return new GatewayResponse(response, exception, statusCode); + } + + private class GatewayResponse { + private final GenericApiGatewayResponse response; + private final GenericApiGatewayException exception; + private final int statusCode; + + private GatewayResponse(final GenericApiGatewayResponse response, final GenericApiGatewayException exception, final int statusCode) { + this.response = response; + this.exception = exception; + this.statusCode = statusCode; + } + } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java index 42d56a8..c380248 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/GetDynamoDBTest.java @@ -16,35 +16,42 @@ */ package org.apache.nifi.processors.aws.dynamodb; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION; -import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.util.MockFlowFile; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Before; -import org.junit.Test; -import org.mockito.ArgumentMatchers; -import org.mockito.Mockito; - import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.document.BatchGetItemOutcome; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.TableKeysAndAttributes; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.BatchGetItemResult; import com.amazonaws.services.dynamodbv2.model.KeysAndAttributes; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.ConfigVerificationResult.Outcome; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.REGION; +import static org.apache.nifi.processors.aws.dynamodb.ITAbstractDynamoDBTest.stringHashStringRangeTableName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class GetDynamoDBTest extends AbstractDynamoDBTest { protected GetDynamoDB getDynamoDB; @@ -79,13 +86,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { }; - getDynamoDB = new GetDynamoDB() { - @Override - protected DynamoDB getDynamoDB() { - return mockDynamoDB; - } - }; - + getDynamoDB = mockDynamoDB(mockDynamoDB); } @Test @@ -105,6 +106,9 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { getRunner.run(1); + // No actual items returned + assertVerificationResults(getRunner, 0, 0); + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_UNPROCESSED, 1); List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED); @@ -144,12 +148,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { }; - getDynamoDB = new GetDynamoDB() { - @Override - protected DynamoDB getDynamoDB() { - return mockDynamoDB; - } - }; + getDynamoDB = mockDynamoDB(mockDynamoDB); final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); @@ -165,6 +164,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { getRunner.run(1); + assertVerificationResults(getRunner, 1, 0); + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_SUCCESS); @@ -205,12 +206,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { }; - getDynamoDB = new GetDynamoDB() { - @Override - protected DynamoDB getDynamoDB() { - return mockDynamoDB; - } - }; + getDynamoDB = mockDynamoDB(mockDynamoDB); final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); @@ -225,6 +221,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { getRunner.enqueue(new byte[] {}); getRunner.run(1); + assertVerificationResults(getRunner, 1, 1); getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_SUCCESS, 1); } @@ -239,12 +236,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { }; - final GetDynamoDB getDynamoDB = new GetDynamoDB() { - @Override - protected DynamoDB getDynamoDB() { - return mockDynamoDB; - } - }; + final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB); final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); @@ -261,6 +253,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { getRunner.run(1); + assertVerificationFailure(getRunner); getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); for (MockFlowFile flowFile : flowFiles) { @@ -281,12 +274,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { }; - final GetDynamoDB getDynamoDB = new GetDynamoDB() { - @Override - protected DynamoDB getDynamoDB() { - return mockDynamoDB; - } - }; + final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB); final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); @@ -303,6 +291,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { getRunner.run(1); + assertVerificationFailure(getRunner); + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); for (MockFlowFile flowFile : flowFiles) { @@ -322,12 +312,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { }; - final GetDynamoDB getDynamoDB = new GetDynamoDB() { - @Override - protected DynamoDB getDynamoDB() { - return mockDynamoDB; - } - }; + final GetDynamoDB getDynamoDB = mockDynamoDB(mockDynamoDB); final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); @@ -344,6 +329,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { getRunner.run(1); + assertVerificationFailure(getRunner); + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); for (MockFlowFile flowFile : flowFiles) { @@ -370,12 +357,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { } }; - final GetDynamoDB getDynamoDB = new GetDynamoDB() { - @Override - protected DynamoDB getDynamoDB() { - return notFoundMockDynamoDB; - } - }; + final GetDynamoDB getDynamoDB = mockDynamoDB(notFoundMockDynamoDB); final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); getRunner.setProperty(AbstractDynamoDBProcessor.ACCESS_KEY,"abcd"); @@ -391,6 +373,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { getRunner.run(1); + assertVerificationResults(getRunner, 0, 0); + getRunner.assertAllFlowFilesTransferred(GetDynamoDB.REL_NOT_FOUND, 1); List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_UNPROCESSED); @@ -407,12 +391,7 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { // When writing, mock thrown service exception from AWS Mockito.when(mockDynamoDb.batchGetItem(ArgumentMatchers.<TableKeysAndAttributes>any())).thenThrow(getSampleAwsServiceException()); - getDynamoDB = new GetDynamoDB() { - @Override - protected DynamoDB getDynamoDB() { - return mockDynamoDb; - } - }; + getDynamoDB = mockDynamoDB(mockDynamoDb); final TestRunner getRunner = TestRunners.newTestRunner(getDynamoDB); @@ -427,6 +406,8 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { getRunner.run(1); + assertVerificationFailure(getRunner); + getRunner.assertAllFlowFilesTransferred(AbstractDynamoDBProcessor.REL_FAILURE, 1); List<MockFlowFile> flowFiles = getRunner.getFlowFilesForRelationship(AbstractDynamoDBProcessor.REL_FAILURE); @@ -540,4 +521,41 @@ public class GetDynamoDBTest extends AbstractDynamoDBTest { assertNotNull(flowFile.getAttribute(AbstractDynamoDBProcessor.DYNAMODB_KEY_ERROR_UNPROCESSED)); } } + + private GetDynamoDB mockDynamoDB(final DynamoDB mockDynamoDB) { + return new GetDynamoDB() { + @Override + protected DynamoDB getDynamoDB() { + return mockDynamoDB; + } + @Override + protected DynamoDB getDynamoDB(final AmazonDynamoDBClient client) { + return mockDynamoDB; + } + + @Override + protected AbstractAWSProcessor<AmazonDynamoDBClient>.AWSConfiguration getConfiguration(final ProcessContext context) { + final AmazonDynamoDBClient client = Mockito.mock(AmazonDynamoDBClient.class); + return new AWSConfiguration(client, region); + } + }; + } + + private void assertVerificationFailure(final TestRunner runner) { + final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor()) + .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + assertEquals(3, results.size()); + assertEquals(Outcome.SUCCESSFUL, results.get(0).getOutcome()); + assertEquals(Outcome.SUCCESSFUL, results.get(1).getOutcome()); + assertEquals(Outcome.FAILED, results.get(2).getOutcome()); + } + + private void assertVerificationResults(final TestRunner runner, final int expectedTotalCount, final int expectedJsonDocumentCount) { + final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor()) + .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); + assertEquals(3, results.size()); + results.forEach(result -> assertEquals(Outcome.SUCCESSFUL, result.getOutcome())); + assertTrue(results.get(2).getExplanation().contains("retrieved " + expectedTotalCount + " items")); + assertTrue(results.get(2).getExplanation().contains(expectedJsonDocumentCount + " JSON")); + } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java index bbd341c..e98fc55 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/dynamodb/ITPutGetDeleteGetDynamoDBTest.java @@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.util.Collections; import java.util.List; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -79,6 +81,11 @@ public class ITPutGetDeleteGetDynamoDBTest extends ITAbstractDynamoDBTest { assertEquals(document, new String(flowFile.toByteArray())); } + final GetDynamoDB getDynamoDB = (GetDynamoDB) getRunner.getProcessor(); + final List<ConfigVerificationResult> results = getDynamoDB.verify(getRunner.getProcessContext(), getRunner.getLogger(), Collections.emptyMap()); + assertEquals(2, results.size()); + assertEquals("Successfully retrieved 1 items, including 1 JSON documents, from DynamoDB", results.get(1).getExplanation()); + final TestRunner deleteRunner = TestRunners.newTestRunner(DeleteDynamoDB.class); deleteRunner.setProperty(DeleteDynamoDB.CREDENTIALS_FILE, CREDENTIALS_FILE); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java index 25a1e4d..71915a1 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java @@ -22,9 +22,12 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.nifi.components.ConfigVerificationResult; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processors.aws.AbstractAWSProcessor; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -45,6 +48,9 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestFetchS3Object { @@ -56,12 +62,17 @@ public class TestFetchS3Object { @Before public void setUp() { - mockS3Client = Mockito.mock(AmazonS3Client.class); + mockS3Client = mock(AmazonS3Client.class); mockFetchS3Object = new FetchS3Object() { protected AmazonS3Client getClient() { actualS3Client = client; return mockS3Client; } + + @Override + protected AbstractAWSProcessor<AmazonS3Client>.AWSConfiguration getConfiguration(ProcessContext context) { + return new AWSConfiguration(mockS3Client, null); + } }; runner = TestRunners.newTestRunner(mockFetchS3Object); } @@ -90,12 +101,21 @@ public class TestFetchS3Object { userMetadata.put("userKey2", "userValue2"); metadata.setUserMetadata(userMetadata); metadata.setSSEAlgorithm("testAlgorithm"); - Mockito.when(metadata.getETag()).thenReturn("test-etag"); + when(metadata.getETag()).thenReturn("test-etag"); s3ObjectResponse.setObjectMetadata(metadata); - Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse); + when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse); + + final long mockSize = 20L; + final ObjectMetadata objectMetadata = mock(ObjectMetadata.class); + when(objectMetadata.getContentLength()).thenReturn(mockSize); + when(mockS3Client.getObjectMetadata(any())).thenReturn(objectMetadata); runner.run(1); + final List<ConfigVerificationResult> results = mockFetchS3Object.verify(runner.getProcessContext(), runner.getLogger(), attrs); + assertEquals(2, results.size()); + results.forEach(result -> assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, result.getOutcome())); + ArgumentCaptor<GetObjectRequest> captureRequest = ArgumentCaptor.forClass(GetObjectRequest.class); Mockito.verify(mockS3Client, Mockito.times(1)).getObject(captureRequest.capture()); GetObjectRequest request = captureRequest.getValue(); @@ -148,9 +168,9 @@ public class TestFetchS3Object { userMetadata.put("userKey2", "userValue2"); metadata.setUserMetadata(userMetadata); metadata.setSSEAlgorithm("testAlgorithm"); - Mockito.when(metadata.getETag()).thenReturn("test-etag"); + when(metadata.getETag()).thenReturn("test-etag"); s3ObjectResponse.setObjectMetadata(metadata); - Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse); + when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse); runner.run(1); @@ -196,9 +216,9 @@ public class TestFetchS3Object { s3ObjectResponse.setObjectContent(new StringInputStream("Some Content")); ObjectMetadata metadata = Mockito.spy(ObjectMetadata.class); metadata.setContentDisposition("key/path/to/file.txt"); - Mockito.when(metadata.getVersionId()).thenReturn("response-version"); + when(metadata.getVersionId()).thenReturn("response-version"); s3ObjectResponse.setObjectMetadata(metadata); - Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse); + when(mockS3Client.getObject(Mockito.any())).thenReturn(s3ObjectResponse); runner.run(1); @@ -242,7 +262,7 @@ public class TestFetchS3Object { final Map<String, String> attrs = new HashMap<>(); attrs.put("filename", "request-key"); runner.enqueue(new byte[0], attrs); - Mockito.when(mockS3Client.getObject(Mockito.any())).thenReturn(null); + when(mockS3Client.getObject(Mockito.any())).thenReturn(null); runner.run(1); diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java index 2942f23..275f0cc 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestListS3.java @@ -128,7 +128,9 @@ public class TestListS3 { final List<ConfigVerificationResult> results = ((VerifiableProcessor) runner.getProcessor()) .verify(runner.getProcessContext(), runner.getLogger(), Collections.emptyMap()); - assertTrue(results.get(0).getExplanation().contains("finding 3 objects")); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(0).getOutcome()); + assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL, results.get(1).getOutcome()); + assertTrue(results.get(1).getExplanation().contains("finding 3 objects")); } @Test diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java index 1179671..76a5bf9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/wag/TestInvokeAmazonGatewayApiMock.java @@ -16,17 +16,9 @@ */ package org.apache.nifi.processors.aws.wag; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; -import static org.mockito.Mockito.times; - import com.amazonaws.ClientConfiguration; import com.amazonaws.http.AmazonHttpClient; import com.amazonaws.http.apache.client.impl.SdkHttpClient; -import java.io.ByteArrayInputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.http.HttpResponse; import org.apache.http.HttpVersion; import org.apache.http.client.methods.HttpUriRequest; @@ -43,6 +35,15 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import java.io.ByteArrayInputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.times; + public class TestInvokeAmazonGatewayApiMock { private TestRunner runner = null;