This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d49371  NIFI-7371 Added FlowFile attributes for exceptions in S3 
processors
0d49371 is described below

commit 0d49371c53e1a6da80fdb33ecfbad13eb5b8cad1
Author: Paul Grey <[email protected]>
AuthorDate: Wed Dec 15 18:54:28 2021 -0500

    NIFI-7371 Added FlowFile attributes for exceptions in S3 processors
    
    This closes #5606
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../processors/aws/s3/AbstractS3Processor.java     | 21 +++++++
 .../nifi/processors/aws/s3/DeleteS3Object.java     | 13 +++-
 .../nifi/processors/aws/s3/FetchS3Object.java      | 11 +++-
 .../apache/nifi/processors/aws/s3/PutS3Object.java |  6 ++
 .../apache/nifi/processors/aws/s3/TagS3Object.java | 10 +++-
 .../nifi/processors/aws/s3/TestFetchS3Object.java  | 70 ++++++++++++++++++++++
 6 files changed, 125 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
index 4bd0769..dc29b78 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-abstract-processors/src/main/java/org/apache/nifi/processors/aws/s3/AbstractS3Processor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.aws.s3;
 
+import com.amazonaws.AmazonServiceException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
@@ -24,6 +25,7 @@ import com.amazonaws.regions.Region;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
 import com.amazonaws.services.s3.model.AccessControlList;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.CanonicalGrantee;
 import com.amazonaws.services.s3.model.EmailAddressGrantee;
@@ -36,6 +38,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
 
@@ -315,6 +318,24 @@ public abstract class AbstractS3Processor extends 
AbstractAWSCredentialsProvider
         return acl;
     }
 
+    protected FlowFile extractExceptionDetails(final Exception e, final 
ProcessSession session, FlowFile flowFile) {
+        flowFile = session.putAttribute(flowFile, "s3.exception", 
e.getClass().getName());
+        if (e instanceof AmazonS3Exception) {
+            flowFile = putAttribute(session, flowFile, "s3.additionalDetails", 
((AmazonS3Exception) e).getAdditionalDetails());
+        }
+        if (e instanceof AmazonServiceException) {
+            final AmazonServiceException ase = (AmazonServiceException) e;
+            flowFile = putAttribute(session, flowFile, "s3.statusCode", 
ase.getStatusCode());
+            flowFile = putAttribute(session, flowFile, "s3.errorCode", 
ase.getErrorCode());
+            flowFile = putAttribute(session, flowFile, "s3.errorMessage", 
ase.getErrorMessage());
+        }
+        return flowFile;
+    }
+
+    private FlowFile putAttribute(final ProcessSession session, final FlowFile 
flowFile, final String key, final Object value) {
+        return (value == null) ? flowFile : session.putAttribute(flowFile, 
key, value.toString());
+    }
+
     /**
      * Create CannedAccessControlList if {@link #CANNED_ACL} property 
specified.
      *
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
index 7d2cce9..ff4249b 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/DeleteS3Object.java
@@ -29,6 +29,8 @@ import com.amazonaws.services.s3.model.DeleteVersionRequest;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -41,6 +43,12 @@ import org.apache.nifi.processor.util.StandardValidators;
 
 
 @SupportsBatching
+@WritesAttributes({
+        @WritesAttribute(attribute = "s3.exception", description = "The class 
name of the exception thrown during processor execution"),
+        @WritesAttribute(attribute = "s3.additionalDetails", description = 
"The S3 supplied detail from the failed operation"),
+        @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP 
error code (if available) from the failed operation"),
+        @WritesAttribute(attribute = "s3.errorCode", description = "The S3 
moniker of the failed operation"),
+        @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 
exception message from the failed operation")})
 @SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
 @Tags({"Amazon", "S3", "AWS", "Archive", "Delete"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -92,7 +100,8 @@ public class DeleteS3Object extends AbstractS3Processor {
                 s3.deleteVersion(r);
             }
         } catch (final AmazonServiceException ase) {
-            getLogger().error("Failed to delete S3 Object for {}; routing to 
failure", new Object[]{flowFile, ase});
+            flowFile = extractExceptionDetails(ase, session, flowFile);
+            getLogger().error("Failed to delete S3 Object for {}; routing to 
failure", flowFile, ase);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
@@ -100,6 +109,6 @@ public class DeleteS3Object extends AbstractS3Processor {
 
         session.transfer(flowFile, REL_SUCCESS);
         final long transferMillis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-        getLogger().info("Successfully delete S3 Object for {} in {} millis; 
routing to success", new Object[]{flowFile, transferMillis});
+        getLogger().info("Successfully delete S3 Object for {} in {} millis; 
routing to success", flowFile, transferMillis);
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
index b9de5fe..f639b54 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/FetchS3Object.java
@@ -72,6 +72,11 @@ import java.util.concurrent.TimeUnit;
     @WritesAttribute(attribute = "hash.algorithm", description = "MD5"),
     @WritesAttribute(attribute = "mime.type", description = "If S3 provides 
the content type/MIME type, this attribute will hold that file"),
     @WritesAttribute(attribute = "s3.etag", description = "The ETag that can 
be used to see if the file has changed"),
+    @WritesAttribute(attribute = "s3.exception", description = "The class name 
of the exception thrown during processor execution"),
+    @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 
supplied detail from the failed operation"),
+    @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP 
error code (if available) from the failed operation"),
+    @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker 
of the failed operation"),
+    @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 
exception message from the failed operation"),
     @WritesAttribute(attribute = "s3.expirationTime", description = "If the 
file has an expiration date, this attribute will be set, containing the 
milliseconds since epoch in UTC time"),
     @WritesAttribute(attribute = "s3.expirationTimeRuleId", description = "The 
ID of the rule that dictates this object's expiration time"),
     @WritesAttribute(attribute = "s3.sseAlgorithm", description = "The server 
side encryption algorithm of the object"),
@@ -251,13 +256,15 @@ public class FetchS3Object extends AbstractS3Processor {
                 attributes.put("s3.version", metadata.getVersionId());
             }
         } catch (final IOException | AmazonClientException ioe) {
-            getLogger().error("Failed to retrieve S3 Object for {}; routing to 
failure", new Object[]{flowFile, ioe});
+            flowFile = extractExceptionDetails(ioe, session, flowFile);
+            getLogger().error("Failed to retrieve S3 Object for {}; routing to 
failure", flowFile, ioe);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
         } catch (final FlowFileAccessException ffae) {
             if (ExceptionUtils.indexOfType(ffae, AmazonClientException.class) 
!= -1) {
-                getLogger().error("Failed to retrieve S3 Object for {}; 
routing to failure", new Object[]{flowFile, ffae});
+                getLogger().error("Failed to retrieve S3 Object for {}; 
routing to failure", flowFile, ffae);
+                flowFile = extractExceptionDetails(ffae, session, flowFile);
                 flowFile = session.penalize(flowFile);
                 session.transfer(flowFile, REL_FAILURE);
                 return;
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
index 0fed5ee..40f086c 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/PutS3Object.java
@@ -117,6 +117,11 @@ expressionLanguageScope = 
ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
     @WritesAttribute(attribute = "s3.key", description = "The S3 key within 
where the Object was put in S3"),
     @WritesAttribute(attribute = "s3.contenttype", description = "The S3 
content type of the S3 Object that put in S3"),
     @WritesAttribute(attribute = "s3.version", description = "The version of 
the S3 Object that was put to S3"),
+    @WritesAttribute(attribute = "s3.exception", description = "The class name 
of the exception thrown during processor execution"),
+    @WritesAttribute(attribute = "s3.additionalDetails", description = "The S3 
supplied detail from the failed operation"),
+    @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP 
error code (if available) from the failed operation"),
+    @WritesAttribute(attribute = "s3.errorCode", description = "The S3 moniker 
of the failed operation"),
+    @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 
exception message from the failed operation"),
     @WritesAttribute(attribute = "s3.etag", description = "The ETag of the S3 
Object"),
     @WritesAttribute(attribute = "s3.contentdisposition", description = "The 
content disposition of the S3 Object that put in S3"),
     @WritesAttribute(attribute = "s3.cachecontrol", description = "The 
cache-control header of the S3 Object"),
@@ -833,6 +838,7 @@ public class PutS3Object extends AbstractS3Processor {
                         new Object[]{cacheKey, e.getMessage()});
             }
         } catch (final ProcessException | AmazonClientException pe) {
+            extractExceptionDetails(pe, session, flowFile);
             if (pe.getMessage().contains(S3_PROCESS_UNSCHEDULED_MESSAGE)) {
                 getLogger().info(pe.getMessage());
                 session.rollback();
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
index 6c9d72a..2f01598 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/s3/TagS3Object.java
@@ -53,7 +53,12 @@ import java.util.stream.Collectors;
 @SupportsBatching
 @WritesAttributes({
         @WritesAttribute(attribute = "s3.tag.___", description = "The tags 
associated with the S3 object will be " +
-                "written as part of the FlowFile attributes")})
+                "written as part of the FlowFile attributes"),
+        @WritesAttribute(attribute = "s3.exception", description = "The class 
name of the exception thrown during processor execution"),
+        @WritesAttribute(attribute = "s3.additionalDetails", description = 
"The S3 supplied detail from the failed operation"),
+        @WritesAttribute(attribute = "s3.statusCode", description = "The HTTP 
error code (if available) from the failed operation"),
+        @WritesAttribute(attribute = "s3.errorCode", description = "The S3 
moniker of the failed operation"),
+        @WritesAttribute(attribute = "s3.errorMessage", description = "The S3 
exception message from the failed operation")})
 @SeeAlso({PutS3Object.class, FetchS3Object.class, ListS3.class})
 @Tags({"Amazon", "S3", "AWS", "Archive", "Tag"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
@@ -171,7 +176,8 @@ public class TagS3Object extends AbstractS3Processor {
             }
             s3.setObjectTagging(r);
         } catch (final AmazonServiceException ase) {
-            getLogger().error("Failed to tag S3 Object for {}; routing to 
failure", new Object[]{flowFile, ase});
+            flowFile = extractExceptionDetails(ase, session, flowFile);
+            getLogger().error("Failed to tag S3 Object for {}; routing to 
failure", flowFile, ase);
             flowFile = session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
             return;
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
index 71915a1..cf005c5 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/s3/TestFetchS3Object.java
@@ -17,11 +17,14 @@
 package org.apache.nifi.processors.aws.s3;
 
 import java.io.IOException;
+import java.net.HttpURLConnection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import com.amazonaws.SdkClientException;
+import com.google.common.collect.ImmutableMap;
 import org.apache.nifi.components.ConfigVerificationResult;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -256,6 +259,72 @@ public class TestFetchS3Object {
     }
 
     @Test
+    public void testFetchObject_FailAdditionalAttributesBucketName() {
+        runner.setProperty(FetchS3Object.REGION, "us-east-1");
+        runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "request-key");
+        runner.enqueue(new byte[0], attrs);
+
+        final AmazonS3Exception exception = new AmazonS3Exception("The 
specified bucket does not exist");
+        exception.setAdditionalDetails(ImmutableMap.of("BucketName", 
"us-east-1", "Error", "ABC123"));
+        exception.setErrorCode("NoSuchBucket");
+        exception.setStatusCode(HttpURLConnection.HTTP_NOT_FOUND);
+        Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+        runner.run(1);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.iterator().next();
+        assertEquals("NoSuchBucket", flowFile.getAttribute("s3.errorCode"));
+        
assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
+        assertEquals("404", flowFile.getAttribute("s3.statusCode"));
+        assertEquals(exception.getClass().getName(), 
flowFile.getAttribute("s3.exception"));
+    }
+
+    @Test
+    public void testFetchObject_FailAdditionalAttributesAuthentication() {
+        runner.setProperty(FetchS3Object.REGION, "us-east-1");
+        runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "request-key");
+        runner.enqueue(new byte[0], attrs);
+
+        final AmazonS3Exception exception = new AmazonS3Exception("signature");
+        
exception.setAdditionalDetails(ImmutableMap.of("CanonicalRequestBytes", "AA BB 
CC DD EE FF"));
+        exception.setErrorCode("SignatureDoesNotMatch");
+        exception.setStatusCode(HttpURLConnection.HTTP_FORBIDDEN);
+        Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+        runner.run(1);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.iterator().next();
+        assertEquals("SignatureDoesNotMatch", 
flowFile.getAttribute("s3.errorCode"));
+        
assertTrue(exception.getMessage().startsWith(flowFile.getAttribute("s3.errorMessage")));
+        assertEquals("403", flowFile.getAttribute("s3.statusCode"));
+        assertEquals(exception.getClass().getName(), 
flowFile.getAttribute("s3.exception"));
+    }
+
+    @Test
+    public void testFetchObject_FailAdditionalAttributesNetworkFailure() {
+        runner.setProperty(FetchS3Object.REGION, "us-east-1");
+        runner.setProperty(FetchS3Object.BUCKET, "request-bucket-bad-name");
+        final Map<String, String> attrs = new HashMap<>();
+        attrs.put("filename", "request-key");
+        runner.enqueue(new byte[0], attrs);
+
+        final SdkClientException exception = new SdkClientException("message");
+        Mockito.doThrow(exception).when(mockS3Client).getObject(Mockito.any());
+        runner.run(1);
+
+        final List<MockFlowFile> flowFiles = 
runner.getFlowFilesForRelationship(FetchS3Object.REL_FAILURE);
+        assertEquals(1, flowFiles.size());
+        final MockFlowFile flowFile = flowFiles.iterator().next();
+        assertEquals(exception.getClass().getName(), 
flowFile.getAttribute("s3.exception"));
+    }
+
+    @Test
     public void testGetObjectReturnsNull() throws IOException {
         runner.setProperty(FetchS3Object.REGION, "us-east-1");
         runner.setProperty(FetchS3Object.BUCKET, "request-bucket");
@@ -284,6 +353,7 @@ public class TestFetchS3Object {
 
         runner.assertAllFlowFilesTransferred(FetchS3Object.REL_FAILURE, 1);
     }
+
     @Test
     public void testGetPropertyDescriptors() throws Exception {
         FetchS3Object processor = new FetchS3Object();

Reply via email to