This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bb8ce7  NIFI-7139 Add record.error.message on failure of a record 
reader or writer
0bb8ce7 is described below

commit 0bb8ce7438d9855dcca6bf89e3a672d1f9477593
Author: Shawn Weeks <swe...@weeksconsulting.us>
AuthorDate: Thu Feb 13 08:39:49 2020 -0600

    NIFI-7139 Add record.error.message on failure of a record reader or writer
    
    Handle scenario where message might be null.
    
    Update to test case that was failing because adding attributes modified a 
flow file even if you don't change the contents.
    
    Fixed Style Issues and Updated WritesAttributes.
    
    Added Test Case for Error Message
    
    Signed-off-by: Matthew Burgess <mattyb...@apache.org>
    
    This closes #4052
---
 .../nifi/processors/standard/AbstractRecordProcessor.java     |  9 +++++++++
 .../org/apache/nifi/processors/standard/ConvertRecord.java    |  3 ++-
 .../org/apache/nifi/processors/standard/UpdateRecord.java     |  6 +++++-
 .../apache/nifi/processors/standard/TestConvertRecord.java    | 11 ++++++-----
 4 files changed, 22 insertions(+), 7 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
index 8ccea5a..1ea70e2 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java
@@ -174,6 +174,15 @@ public abstract class AbstractRecordProcessor extends 
AbstractProcessor {
             });
         } catch (final Exception e) {
             getLogger().error("Failed to process {}; will route to failure", 
new Object[] {flowFile, e});
+            // Since we are wrapping the exceptions above there should always 
be a cause
+            // but it's possible it might not have a message. This handles 
that by logging
+            // the name of the class thrown.
+            Throwable c = e.getCause();
+            if (c != null) {
+                session.putAttribute(flowFile, "record.error.message", 
(c.getLocalizedMessage() != null) ? c.getLocalizedMessage() : 
c.getClass().getCanonicalName() + " Thrown");
+            } else {
+                session.putAttribute(flowFile, "record.error.message", 
e.getClass().getCanonicalName() + " Thrown");
+            }
             session.transfer(flowFile, REL_FAILURE);
             return;
         }
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
index 1be1794..a1e6f99 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
@@ -41,7 +41,8 @@ import java.util.List;
 @Tags({"convert", "record", "generic", "schema", "json", "csv", "avro", "log", 
"logs", "freeform", "text"})
 @WritesAttributes({
     @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
-    @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
+    @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile"),
+    @WritesAttribute(attribute = "record.error.message", description = "This 
attribute provides on failure the error message encountered by the Reader or 
Writer.")
 })
 @CapabilityDescription("Converts records from one data format to another using 
configured Record Reader and Record Write Controller Services. "
     + "The Reader and Writer must be configured with \"matching\" schemas. By 
this, we mean the schemas must have the same field names. The types of the 
fields "
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
index 8ee5f43..65abdc9 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
@@ -23,6 +23,7 @@ import 
org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -67,7 +68,10 @@ import java.util.stream.Stream;
     + "This Processor requires that at least one user-defined Property be 
added. The name of the Property should indicate a RecordPath that determines 
the field that should "
     + "be updated. The value of the Property is either a replacement value 
(optionally making use of the Expression Language) or is itself a RecordPath 
that extracts a value from "
     + "the Record. Whether the Property value is determined to be a RecordPath 
or a literal value depends on the configuration of the <Replacement Value 
Strategy> Property.")
-@WritesAttribute(attribute = "record.index", description = "This attribute 
provides the current row index and is only available inside the literal value 
expression.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "record.index", description = "This attribute 
provides the current row index and is only available inside the literal value 
expression."),
+    @WritesAttribute(attribute = "record.error.message", description = "This 
attribute provides on failure the error message encountered by the Reader or 
Writer.")
+})
 @SeeAlso({ConvertRecord.class})
 public class UpdateRecord extends AbstractRecordProcessor {
     private static final String FIELD_NAME = "field.name";
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index 822f664..7870a04 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -120,7 +119,7 @@ public class TestConvertRecord {
     }
 
     @Test
-    public void testReadFailure() throws InitializationException {
+    public void testReadFailure() throws InitializationException, IOException {
         final MockRecordParser readerService = new MockRecordParser(2);
         final MockRecordWriter writerService = new MockRecordWriter("header", 
false);
 
@@ -146,12 +145,13 @@ public class TestConvertRecord {
         // Original FlowFile should be routed to 'failure' relationship 
without modification
         runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
         final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
-        assertTrue(original == out);
+        out.assertContentEquals(original.toByteArray());
+        out.assertAttributeEquals("record.error.message","Intentional Unit 
Test Exception because 2 records have been read");
     }
 
 
     @Test
-    public void testWriteFailure() throws InitializationException {
+    public void testWriteFailure() throws InitializationException, IOException 
{
         final MockRecordParser readerService = new MockRecordParser();
         final MockRecordWriter writerService = new MockRecordWriter("header", 
false, 2);
 
@@ -177,7 +177,8 @@ public class TestConvertRecord {
         // Original FlowFile should be routed to 'failure' relationship 
without modification
         runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
         final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
-        assertTrue(original == out);
+        out.assertContentEquals(original.toByteArray());
+        out.assertAttributeEquals("record.error.message","Unit Test 
intentionally throwing IOException after 2 records were written");
     }
 
     @Test

Reply via email to