This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0d49371 NIFI-7371 Added FlowFile attributes for exceptions in S3
processors
0d49371 is described below
commit 0d49371c53e1a6da80fdb33ecfbad13eb5b8cad1
Author: Paul Grey <[email protected]>
AuthorDate: Wed Dec 15 18:54:28 2021 -0500
NIFI-7371 Added FlowFile attributes for exceptions in S3 processors
This closes #5606
Signed-off-by: David Handermann <[email protected]>
---
.../processors/aws/s3/AbstractS3Processor.java | 21 +++++++
.../nifi/processors/aws/s3/DeleteS3Object.java | 13 +++-
.../nifi/processors/aws/s3/FetchS3Object.java | 11 +++-
.../apache/nifi/processors/aws/s3/PutS3Object.java | 6 ++
.../apache/nifi/processors/aws/s3/TagS3Object.java | 10 +++-
.../nifi/processors/aws/s3/TestFetchS3Object.java | 70 ++++++++++++++++++++++
6 files changed, 125 insertions(+), 6 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 4bd0769..dc29b78 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.processors.aws.s3;
+import com.amazonaws.AmazonServiceException;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
@@ -24,6 +25,7 @@ import com.amazonaws.regions.Region;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.EmailAddressGrantee;
@@ -36,6 +38,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
@@ -315,6 +318,24 @@ public abstract class AbstractS3Processor extends
AbstractAWSCredentialsProvider
return acl;
}
+ protected FlowFile extractExceptionDetails(final Exception e, final
ProcessSession session, FlowFile flowFile) {
+ flowFile = session.putAttribute(flowFile, "s3.exception",
e.getClass().getName());
+ if (e instanceof AmazonS3Exception) {
+ flowFile = putAttribute(session, flowFile, "s3.additionalDetails",
((AmazonS3Exception) e).getAdditionalDetails());
+ }
+ if (e instanceof AmazonServiceException) {
+ final AmazonServiceException ase = (AmazonServiceException) e;
+ flowFile = putAttribute(session, flowFile, "s3.statusCode",
ase.getStatusCode());
+ flowFile = putAttribute(session, flowFile, "s3.errorCode",
ase.getErrorCode());
+ flowFile = putAttribute(session, flowFile, "s3.errorMessage",
ase.getErrorMessage());
+ }
+ return flowFile;
+ }
+
+ private FlowFile putAttribute(final ProcessSession session, final FlowFile
flowFile, final String key, final Object value) {
+ return (value == null) ? flowFile : session.putAttribute(flowFile,
key, value.toString());
+ }
+
/**
* Create CannedAccessControlList if {@link #CANNED_ACL} property
specified.
*
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 7d2cce9..ff4249b 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -29,6 +29,8 @@ import com.amazonaws.services.s3.model.DeleteVersionRequest;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@@ -41,6 +43,12 @@ import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
+@WritesAttributes({
+ @WritesAttribute(attribute = "s3.exception", description = "The class
name of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = "s3.additionalDetails", description =
"The S3 supplied detail from the failed operation"),
+ @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP
error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = "s3.errorCode", description = "The S3
moniker of the failed operation"),
+ @WritesAttribute(attribute = "s3.errorMessage", description = "The S3
exception message from the failed operation")})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -92,7 +100,8 @@ public class DeleteS3Object extends AbstractS3Processor {
s3.deleteVersion(r);
}
} catch (final AmazonServiceException ase) {
- getLogger().error("Failed to delete S3 Object for {}; routing to
failure", new Object[]{flowFile, ase});
+ flowFile = extractExceptionDetails(ase, session, flowFile);
+ getLogger().error("Failed to delete S3 Object for {}; routing to
failure", flowFile, ase);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
@@ -100,6 +109,6 @@ public class DeleteS3Object extends AbstractS3Processor {
session.transfer(flowFile, REL_SUCCESS);
final long transferMillis =
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
- getLogger().info("Successfully delete S3 Object for {} in {} millis;
routing to success", new Object[]{flowFile, transferMillis});
+ getLogger().info("Successfully delete S3 Object for {} in {} millis;
routing to success", flowFile, transferMillis);
}
}
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index b9de5fe..f639b54 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -72,6 +72,11 @@ import java.util.concurrent.TimeUnit;
@WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
@WritesAttribute(attribute = "mime.type", description = "If S3 provides
the content type/MIME type, this attribute will hold that file"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag that can
be used to see if the file has changed"),
+ @WritesAttribute(attribute = "s3.exception", description = "The class name
of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3
supplied detail from the failed operation"),
+ @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP
error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker
of the failed operation"),
+ @WritesAttribute(attribute = "s3.errorMessage", description = "The S3
exception message from the failed operation"),
@WritesAttribute(attribute = "s3.expirationTime", description = "If the
file has an expiration date, this attribute will be set, containing the
milliseconds since epoch in UTC time"),
@WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The
ID of the rule that dictates this object's expiration time"),
@WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server
side encryption algorithm of the object"),
@@ -251,13 +256,15 @@ public class FetchS3Object extends AbstractS3Processor {
attributes.put("s3.version", metadata.getVersionId());
}
} catch (final IOException | AmazonClientException ioe) {
- getLogger().error("Failed to retrieve S3 Object for {}; routing to
failure", new Object[]{flowFile, ioe});
+ flowFile = extractExceptionDetails(ioe, session, flowFile);
+ getLogger().error("Failed to retrieve S3 Object for {}; routing to
failure", flowFile, ioe);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
} catch (final FlowFileAccessException ffae) {
if (ExceptionUtils.indexOfType(ffae, AmazonClientException.class)
!= -1) {
- getLogger().error("Failed to retrieve S3 Object for {};
routing to failure", new Object[]{flowFile, ffae});
+ getLogger().error("Failed to retrieve S3 Object for {};
routing to failure", flowFile, ffae);
+ flowFile = extractExceptionDetails(ffae, session, flowFile);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 0fed5ee..40f086c 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -117,6 +117,11 @@ expressionLanguageScope =
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
@WritesAttribute(attribute = "s3.key", description = "The S3 key within
where the Object was put in S3"),
@WritesAttribute(attribute = "s3.contenttype", description = "The S3
content type of the S3 Object that put in S3"),
@WritesAttribute(attribute = "s3.version", description = "The version of
the S3 Object that was put to S3"),
+ @WritesAttribute(attribute = "s3.exception", description = "The class name
of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3
supplied detail from the failed operation"),
+ @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP
error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker
of the failed operation"),
+ @WritesAttribute(attribute = "s3.errorMessage", description = "The S3
exception message from the failed operation"),
@WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3
Object"),
@WritesAttribute(attribute = "s3.contentdisposition", description = "The
content disposition of the S3 Object that put in S3"),
@WritesAttribute(attribute = "s3.cachecontrol", description = "The
cache-control header of the S3 Object"),
@@ -833,6 +838,7 @@ public class PutS3Object extends AbstractS3Processor {
new Object[]{cacheKey, e.getMessage()});
}
} catch (final ProcessException | AmazonClientException pe) {
+ extractExceptionDetails(pe, session, flowFile);
if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
getLogger().info(pe.getMessage());
session.rollback();
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
index 6c9d72a..2f01598 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
@@ -53,7 +53,12 @@ import java.util.stream.Collectors;
@SupportsBatching
@WritesAttributes({
@WritesAttribute(attribute = "s3.tag.___", description = "The tags
associated with the S3 object will be " +
- "written as part of the FlowFile attributes")})
+ "written as part of the FlowFile attributes"),
+ @WritesAttribute(attribute = "s3.exception", description = "The class
name of the exception thrown during processor execution"),
+ @WritesAttribute(attribute = "s3.additionalDetails", description =
"The S3 supplied detail from the failed operation"),
+ @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP
error code (if available) from the failed operation"),
+ @WritesAttribute(attribute = "s3.errorCode", description = "The S3
moniker of the failed operation"),
+ @WritesAttribute(attribute = "s3.errorMessage", description = "The S3
exception message from the failed operation")})
@SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
@Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@@ -171,7 +176,8 @@ public class TagS3Object extends AbstractS3Processor {
}
s3.setObjectTagging(r);
} catch (final AmazonServiceException ase) {
- getLogger().error("Failed to tag S3 Object for {}; routing to
failure", new Object[]{flowFile, ase});
+ flowFile = extractExceptionDetails(ase, session, flowFile);
+ getLogger().error("Failed to tag S3 Object for {}; routing to
failure", flowFile, ase);
flowFile = session.penalize(flowFile);
session.transfer(flowFile, REL_FAILURE);
return;
diff --git
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 71915a1..cf005c5 100644
---
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -17,11 +17,14 @@
package org.apache.nifi.processors.aws.s3;
import java.io.IOException;
+import java.net.HttpURLConnection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.amazonaws.SdkClientException;
+import com.google.common.collect.ImmutableMap;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -256,6 +259,72 @@ public class TestFetchS3Object {
}
@Test
+ public void testFetchObject_FailAdditionalAttributesBucketName() {
+ runner.setProperty(FetchS3Object.REGION, "us-east-1");
+ runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "request-key");
+ runner.enqueue(new byte[0], attrs);
+
+ final AmazonS3Exception exception = new AmazonS3Exception("The
specified bucket does not exist");
+ exception.setAdditionalDetails(ImmutableMap.of("BucketName",
"us-east-1", "Error", "ABC123"));
+ exception.setErrorCode("NoSuchBucket");
+ exception.setStatusCode(HttpURLConnection.HTTP_NOT_FOUND);
+ Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+ runner.run(1);
+
+ final List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+ final MockFlowFile flowFile = flowFiles.iterator().next();
+ assertEquals("NoSuchBucket", flowFile.getAttribute("s3.errorCode"));
+
assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
+ assertEquals("404", flowFile.getAttribute("s3.statusCode"));
+ assertEquals(exception.getClass().getName(),
flowFile.getAttribute("s3.exception"));
+ }
+
+ @Test
+ public void testFetchObject_FailAdditionalAttributesAuthentication() {
+ runner.setProperty(FetchS3Object.REGION, "us-east-1");
+ runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "request-key");
+ runner.enqueue(new byte[0], attrs);
+
+ final AmazonS3Exception exception = new AmazonS3Exception("signature");
+
exception.setAdditionalDetails(ImmutableMap.of("CanonicalRequestBytes", "AA BB
CC DD EE FF"));
+ exception.setErrorCode("SignatureDoesNotMatch");
+ exception.setStatusCode(HttpURLConnection.HTTP_FORBIDDEN);
+ Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+ runner.run(1);
+
+ final List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+ final MockFlowFile flowFile = flowFiles.iterator().next();
+ assertEquals("SignatureDoesNotMatch",
flowFile.getAttribute("s3.errorCode"));
+
assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
+ assertEquals("403", flowFile.getAttribute("s3.statusCode"));
+ assertEquals(exception.getClass().getName(),
flowFile.getAttribute("s3.exception"));
+ }
+
+ @Test
+ public void testFetchObject_FailAdditionalAttributesNetworkFailure() {
+ runner.setProperty(FetchS3Object.REGION, "us-east-1");
+ runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+ final Map<String, String> attrs = new HashMap<>();
+ attrs.put("filename", "request-key");
+ runner.enqueue(new byte[0], attrs);
+
+ final SdkClientException exception = new SdkClientException("message");
+ Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+ runner.run(1);
+
+ final List<MockFlowFile> flowFiles =
runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+ assertEquals(1, flowFiles.size());
+ final MockFlowFile flowFile = flowFiles.iterator().next();
+ assertEquals(exception.getClass().getName(),
flowFile.getAttribute("s3.exception"));
+ }
+
+ @Test
public void testGetObjectReturnsNull() throws IOException {
runner.setProperty(FetchS3Object.REGION, "us-east-1");
runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
@@ -284,6 +353,7 @@ public class TestFetchS3Object {
runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
}
+
@Test
public void testGetPropertyDescriptors() throws Exception {
FetchS3Object processor = new FetchS3Object();