NIFI-738: Improve error handling for CSV and JSON conversion.

Changes:
* Send bad record information on the "incompatible" relationship
* Use an attribute, "errors" for summarized error messages
* Send no error content for now, original content can't be accessed
* Summarize similar error messages with "N similar failures"
* Add similar error handling to CSV conversion

Signed-off-by: joewitt <joew...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2e32e0e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2e32e0e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2e32e0e2

Branch: refs/heads/master
Commit: 2e32e0e2b82e071734919589ed680513edfd39ed
Parents: 4dc2ea6
Author: Ryan Blue <b...@apache.org>
Authored: Sat Jun 27 17:36:42 2015 -0700
Committer: joewitt <joew...@apache.org>
Committed: Mon Jun 29 22:00:22 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/kite/ConvertCSVToAvro.java  |  76 ++++++++++----
 .../nifi/processors/kite/ConvertJSONToAvro.java |  95 ++++++++++-------
 .../nifi/processors/kite/FailureTracker.java    |  83 +++++++++++++++
 .../processors/kite/TestCSVToAvroProcessor.java |  96 +++++++++++++++++-
 .../kite/TestJSONToAvroProcessor.java           | 101 +++++++++++++++++--
 5 files changed, 384 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index 564a203..6c20a8f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -44,6 +44,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.LongHolder;
 import org.kitesdk.data.DatasetException;
 import org.kitesdk.data.DatasetIOException;
 import org.kitesdk.data.DatasetRecordException;
@@ -76,12 +77,17 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor 
{
 
     private static final Relationship SUCCESS = new Relationship.Builder()
             .name("success")
-            .description("FlowFile content has been successfully saved")
+            .description("Avro content that was converted successfully from 
CSV")
             .build();
 
     private static final Relationship FAILURE = new Relationship.Builder()
             .name("failure")
-            .description("FlowFile content could not be processed")
+            .description("CSV content that could not be processed")
+            .build();
+
+    private static final Relationship INCOMPATIBLE = new Relationship.Builder()
+            .name("incompatible")
+            .description("CSV content that could not be converted")
             .build();
 
     @VisibleForTesting
@@ -164,6 +170,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor 
{
             = ImmutableSet.<Relationship>builder()
             .add(SUCCESS)
             .add(FAILURE)
+            .add(INCOMPATIBLE)
             .build();
 
     // Immutable configuration
@@ -197,20 +204,20 @@ public class ConvertCSVToAvro extends 
AbstractKiteProcessor {
     @Override
     public void onTrigger(ProcessContext context, final ProcessSession session)
             throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
+        FlowFile incomingCSV = session.get();
+        if (incomingCSV == null) {
             return;
         }
 
         String schemaProperty = context.getProperty(SCHEMA)
-                .evaluateAttributeExpressions(flowFile)
+                .evaluateAttributeExpressions(incomingCSV)
                 .getValue();
         final Schema schema;
         try {
             schema = getSchema(schemaProperty, DefaultConfiguration.get());
         } catch (SchemaNotFoundException e) {
             getLogger().error("Cannot find schema: " + schemaProperty);
-            session.transfer(flowFile, FAILURE);
+            session.transfer(incomingCSV, FAILURE);
             return;
         }
 
@@ -219,11 +226,13 @@ public class ConvertCSVToAvro extends 
AbstractKiteProcessor {
         writer.setCodec(CodecFactory.snappyCodec());
 
         try {
-            flowFile = session.write(flowFile, new StreamCallback() {
+            final LongHolder written = new LongHolder(0L);
+            final FailureTracker failures = new FailureTracker();
+
+            FlowFile badRecords = session.clone(incomingCSV);
+            FlowFile outgoingAvro = session.write(incomingCSV, new 
StreamCallback() {
                 @Override
                 public void process(InputStream in, OutputStream out) throws 
IOException {
-                    long written = 0L;
-                    long errors = 0L;
                     try (CSVFileReader<Record> reader = new CSVFileReader<>(
                             in, props, schema, Record.class)) {
                         reader.initialize();
@@ -232,29 +241,58 @@ public class ConvertCSVToAvro extends 
AbstractKiteProcessor {
                                 try {
                                     Record record = reader.next();
                                     w.append(record);
-                                    written += 1;
+                                    written.incrementAndGet();
                                 } catch (DatasetRecordException e) {
-                                    errors += 1;
+                                    failures.add(e);
                                 }
                             }
                         }
                     }
-                    session.adjustCounter("Converted records", written,
-                            false /* update only if file transfer is 
successful */);
-                    session.adjustCounter("Conversion errors", errors,
-                            false /* update only if file transfer is 
successful */);
                 }
             });
 
-            session.transfer(flowFile, SUCCESS);
+            long errors = failures.count();
+
+            session.adjustCounter("Converted records", written.get(),
+                    false /* update only if file transfer is successful */);
+            session.adjustCounter("Conversion errors", errors,
+                    false /* update only if file transfer is successful */);
+
+            if (written.get() > 0L) {
+                session.transfer(outgoingAvro, SUCCESS);
+
+                if (errors > 0L) {
+                    getLogger().warn("Failed to convert {}/{} records from CSV 
to Avro",
+                            new Object[] { errors, errors + written.get() });
+                    badRecords = session.putAttribute(
+                            badRecords, "errors", failures.summary());
+                    session.transfer(badRecords, INCOMPATIBLE);
+                } else {
+                    session.remove(badRecords);
+                }
+
+            } else {
+                session.remove(outgoingAvro);
+
+                if (errors > 0L) {
+                    getLogger().warn("Failed to convert {}/{} records from CSV 
to Avro",
+                            new Object[] { errors, errors });
+                    badRecords = session.putAttribute(
+                            badRecords, "errors", failures.summary());
+                } else {
+                    badRecords = session.putAttribute(
+                            badRecords, "errors", "No incoming records");
+                }
+
+                session.transfer(badRecords, FAILURE);
+            }
 
-            //session.getProvenanceReporter().send(flowFile, 
target.getUri().toString());
         } catch (ProcessException | DatasetIOException e) {
             getLogger().error("Failed reading or writing", e);
-            session.transfer(flowFile, FAILURE);
+            session.transfer(incomingCSV, FAILURE);
         } catch (DatasetException e) {
             getLogger().error("Failed to read FlowFile", e);
-            session.transfer(flowFile, FAILURE);
+            session.transfer(incomingCSV, FAILURE);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
index 7969a8f..ec1503c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -38,8 +38,8 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.util.LongHolder;
 import org.kitesdk.data.DatasetException;
 import org.kitesdk.data.DatasetIOException;
 import org.kitesdk.data.DatasetRecordException;
@@ -54,12 +54,17 @@ public class ConvertJSONToAvro extends 
AbstractKiteProcessor {
 
     private static final Relationship SUCCESS = new Relationship.Builder()
             .name("success")
-            .description("FlowFile content has been successfully saved")
+            .description("Avro content that was converted successfully from 
JSON")
             .build();
 
     private static final Relationship FAILURE = new Relationship.Builder()
             .name("failure")
-            .description("FlowFile content could not be processed")
+            .description("JSON content that could not be processed")
+            .build();
+
+    private static final Relationship INCOMPATIBLE = new Relationship.Builder()
+            .name("incompatible")
+            .description("JSON content that could not be converted")
             .build();
 
     @VisibleForTesting
@@ -82,6 +87,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
             = ImmutableSet.<Relationship>builder()
             .add(SUCCESS)
             .add(FAILURE)
+            .add(INCOMPATIBLE)
             .build();
 
     public ConvertJSONToAvro() {
@@ -100,20 +106,20 @@ public class ConvertJSONToAvro extends 
AbstractKiteProcessor {
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session)
             throws ProcessException {
-        FlowFile successfulRecords = session.get();
-        if (successfulRecords == null) {
+        FlowFile incomingJSON = session.get();
+        if (incomingJSON == null) {
             return;
         }
 
         String schemaProperty = context.getProperty(SCHEMA)
-                .evaluateAttributeExpressions(successfulRecords)
+                .evaluateAttributeExpressions(incomingJSON)
                 .getValue();
         final Schema schema;
         try {
             schema = getSchema(schemaProperty, DefaultConfiguration.get());
         } catch (SchemaNotFoundException e) {
             getLogger().error("Cannot find schema: " + schemaProperty);
-            session.transfer(successfulRecords, FAILURE);
+            session.transfer(incomingJSON, FAILURE);
             return;
         }
 
@@ -122,59 +128,74 @@ public class ConvertJSONToAvro extends 
AbstractKiteProcessor {
         writer.setCodec(CodecFactory.snappyCodec());
 
         try {
-            successfulRecords = session.write(successfulRecords, new 
StreamCallback() {
+            final LongHolder written = new LongHolder(0L);
+            final FailureTracker failures = new FailureTracker();
+
+            FlowFile badRecords = session.clone(incomingJSON);
+            FlowFile outgoingAvro = session.write(incomingJSON, new 
StreamCallback() {
                 @Override
                 public void process(InputStream in, OutputStream out) throws 
IOException {
-                    FlowFile failedRecords = session.create();
-                    long written = 0L;
-                    long errors = 0L;
-                    long total = 0L;
                     try (JSONFileReader<Record> reader = new JSONFileReader<>(
                             in, schema, Record.class)) {
                         reader.initialize();
                         try (DataFileWriter<Record> w = writer.create(schema, 
out)) {
                             while (reader.hasNext()) {
-                                total += 1;
                                 try {
                                     Record record = reader.next();
                                     w.append(record);
-                                    written += 1;
+                                    written.incrementAndGet();
+
                                 } catch (final DatasetRecordException e) {
-                                    failedRecords = 
session.append(failedRecords, new OutputStreamCallback() {
-                                        @Override
-                                        public void process(OutputStream out) 
throws IOException {
-                                            out.write((e.getMessage() + " ["
-                                                    + 
e.getCause().getMessage() + "]\n").getBytes());
-                                        }
-                                    });
-                                    errors += 1;
+                                    failures.add(e);
                                 }
                             }
                         }
-                        session.adjustCounter("Converted records", written,
-                                false /* update only if file transfer is 
successful */);
-                        session.adjustCounter("Conversion errors", errors,
-                                false /* update only if file transfer is 
successful */);
-
-                        if (errors > 0L) {
-                            getLogger().warn("Failed to convert " + errors + 
'/' + total + " records from JSON to Avro");
-                        }
                     }
-                    session.transfer(failedRecords, FAILURE);
                 }
             });
 
-            session.transfer(successfulRecords, SUCCESS);
+            long errors = failures.count();
+
+            session.adjustCounter("Converted records", written.get(),
+                    false /* update only if file transfer is successful */);
+            session.adjustCounter("Conversion errors", errors,
+                    false /* update only if file transfer is successful */);
+
+            if (written.get() > 0L) {
+                session.transfer(outgoingAvro, SUCCESS);
+
+                if (errors > 0L) {
+                    getLogger().warn("Failed to convert {}/{} records from 
JSON to Avro",
+                            new Object[] { errors, errors + written.get() });
+                    badRecords = session.putAttribute(
+                            badRecords, "errors", failures.summary());
+                    session.transfer(badRecords, INCOMPATIBLE);
+                } else {
+                    session.remove(badRecords);
+                }
+
+            } else {
+                session.remove(outgoingAvro);
+
+                if (errors > 0L) {
+                    getLogger().warn("Failed to convert {}/{} records from 
JSON to Avro",
+                            new Object[] { errors, errors });
+                    badRecords = session.putAttribute(
+                            badRecords, "errors", failures.summary());
+                } else {
+                    badRecords = session.putAttribute(
+                            badRecords, "errors", "No incoming records");
+                }
+
+                session.transfer(badRecords, FAILURE);
+            }
 
-            //session.getProvenanceReporter().send(flowFile, 
target.getUri().toString());
         } catch (ProcessException | DatasetIOException e) {
             getLogger().error("Failed reading or writing", e);
-            session.transfer(successfulRecords, FAILURE);
-
+            session.transfer(incomingJSON, FAILURE);
         } catch (DatasetException e) {
             getLogger().error("Failed to read FlowFile", e);
-            session.transfer(successfulRecords, FAILURE);
-
+            session.transfer(incomingJSON, FAILURE);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java
new file mode 100644
index 0000000..de86761
--- /dev/null
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.nifi.processors.kite;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
+import java.util.Map;
+
+class FailureTracker {
+    private static final Splitter REASON_SEPARATOR = Splitter.on(':').limit(2);
+
+    private final Map<String, String> examples = Maps.newLinkedHashMap();
+    private final Map<String, Integer> occurrences = Maps.newLinkedHashMap();
+    long count = 0L;
+
+    public void add(Throwable throwable) {
+        add(reason(throwable));
+    }
+
+    public void add(String reason) {
+        count += 1;
+        String problem = 
Iterators.getNext(REASON_SEPARATOR.split(reason).iterator(), "Unknown");
+        if (examples.containsKey(problem)) {
+            occurrences.put(problem, occurrences.get(problem) + 1);
+        } else {
+            examples.put(problem, reason);
+            occurrences.put(problem, 1);
+        }
+    }
+
+    public long count() {
+        return count;
+    }
+
+    public String summary() {
+        boolean first = true;
+        StringBuilder sb = new StringBuilder();
+        for (String problem : examples.keySet()) {
+            if (first) {
+                first = false;
+            } else {
+                sb.append(", ");
+            }
+            sb.append(examples.get(problem));
+            int similar = occurrences.get(problem) - 1;
+            if (similar == 1) {
+                sb.append(" (1 similar failure)");
+            } else if (similar > 1) {
+                sb.append(" (").append(similar).append(" similar failures)");
+            }
+        }
+        return sb.toString();
+    }
+
+    private static String reason(Throwable t) {
+        StringBuilder sb = new StringBuilder();
+        for (Throwable current = t; current != null; current = 
current.getCause()) {
+            if (current != t) {
+                sb.append(": ");
+            }
+            sb.append(current.getMessage());
+        }
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index 753b72b..43dea6e 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -20,9 +20,11 @@ package org.apache.nifi.processors.kite;
 
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -43,6 +45,12 @@ public class TestCSVToAvroProcessor {
             + ",blue,\n" + // invalid, ID is missing
             "2,grey,12.95";
 
+    public static final String FAILURE_CONTENT = ""
+            + ",blue,\n"; // invalid, ID is missing
+
+    public static final String FAILURE_SUMMARY = "" +
+            "Field id: cannot make \"long\" value: '': Field id type:LONG 
pos:0 not set and has no default value";
+
     @Test
     public void testBasicConversion() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
@@ -58,7 +66,17 @@ public class TestCSVToAvroProcessor {
         Assert.assertEquals("Should convert 2 rows", 2, converted);
         Assert.assertEquals("Should reject 1 row", 1, errors);
 
-        runner.assertAllFlowFilesTransferred("success", 1);
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+        runner.assertTransferCount("incompatible", 1);
+
+        MockFlowFile incompatible = 
runner.getFlowFilesForRelationship("incompatible").get(0);
+        String failureContent = new 
String(runner.getContentAsByteArray(incompatible),
+                StandardCharsets.UTF_8);
+        Assert.assertEquals("Should reject an invalid string and double",
+                CSV_CONTENT, failureContent);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
     }
 
     @Test
@@ -76,7 +94,61 @@ public class TestCSVToAvroProcessor {
         Assert.assertEquals("Should convert 2 rows", 2, converted);
         Assert.assertEquals("Should reject 1 row", 1, errors);
 
-        runner.assertAllFlowFilesTransferred("success", 1);
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+        runner.assertTransferCount("incompatible", 1);
+
+        MockFlowFile incompatible = 
runner.getFlowFilesForRelationship("incompatible").get(0);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+    }
+
+    @Test
+    public void testOnlyErrors() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+        runner.assertValid();
+
+        runner.enqueue(streamFor(FAILURE_CONTENT));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 0 rows", 0, converted);
+        Assert.assertEquals("Should reject 1 row", 1, errors);
+
+        runner.assertTransferCount("success", 0);
+        runner.assertTransferCount("failure", 1);
+        runner.assertTransferCount("incompatible", 0);
+
+        MockFlowFile incompatible = 
runner.getFlowFilesForRelationship("failure").get(0);
+        Assert.assertEquals("Should set an error message",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+    }
+
+    @Test
+    public void testEmptyContent() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+        runner.assertValid();
+
+        runner.enqueue(streamFor(""));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 0 rows", 0, converted);
+        Assert.assertEquals("Should reject 0 row", 0, errors);
+
+        runner.assertTransferCount("success", 0);
+        runner.assertTransferCount("failure", 1);
+        runner.assertTransferCount("incompatible", 0);
+
+        MockFlowFile incompatible = 
runner.getFlowFilesForRelationship("failure").get(0);
+        Assert.assertEquals("Should set an error message",
+                "No incoming records", incompatible.getAttribute("errors"));
     }
 
     @Test
@@ -122,4 +194,24 @@ public class TestCSVToAvroProcessor {
         Assert.assertEquals("Lines to skip should match",
                 2, processor.props.linesToSkip);
     }
+
+    @Test
+    public void testBasicConversionNoErrors() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+        runner.assertValid();
+
+        runner.enqueue(streamFor("1,green\n2,blue,\n3,grey,12.95"));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 3 rows", 3, converted);
+        Assert.assertEquals("Should reject 0 row", 0, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+        runner.assertTransferCount("incompatible", 0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
index 704d998..e0b4a6f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
@@ -19,10 +19,11 @@
 package org.apache.nifi.processors.kite;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -41,11 +42,18 @@ public class TestJSONToAvroProcessor {
     public static final String JSON_CONTENT = ""
             + "{\"id\": 1,\"color\": \"green\"}"
             + "{\"id\": \"120V\", \"color\": \"blue\"}\n" // invalid, ID is a 
string
-            + "{\"id\": 10, \"color\": 15.23}\n" + // invalid, color as double
-            "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
+            + "{\"id\": 10, \"color\": 15.23}\n" // invalid, color as double
+            + "{\"id\": 20, \"color\": 34}\n" // invalid, color as int
+            + "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
 
-    public static final String FAILURE_CONTENT = "Cannot convert field id 
[Cannot convert to long: \"120V\"]\n"
-            + "Cannot convert field color [Cannot convert to string: 15.23]\n";
+    public static final String FAILURE_CONTENT = ""
+            + "{\"id\": \"120V\", \"color\": \"blue\"}\n"
+            + "{\"id\": 10, \"color\": 15.23}\n"
+            + "{\"id\": 20, \"color\": 34}\n";
+
+    public static final String FAILURE_SUMMARY = ""
+            + "Cannot convert field id: Cannot convert to long: \"120V\", "
+            + "Cannot convert field color: Cannot convert to string: 15.23 (1 
similar failure)";
 
     @Test
     public void testBasicConversion() throws IOException {
@@ -60,13 +68,88 @@ public class TestJSONToAvroProcessor {
         long converted = runner.getCounterValue("Converted records");
         long errors = runner.getCounterValue("Conversion errors");
         Assert.assertEquals("Should convert 2 rows", 2, converted);
-        Assert.assertEquals("Should reject 2 rows", 2, errors);
+        Assert.assertEquals("Should reject 3 rows", 3, errors);
 
         runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+        runner.assertTransferCount("incompatible", 1);
+
+        MockFlowFile incompatible = 
runner.getFlowFilesForRelationship("incompatible").get(0);
+        String failureContent = new 
String(runner.getContentAsByteArray(incompatible),
+                StandardCharsets.UTF_8);
+        Assert.assertEquals("Should reject an invalid string and double",
+                JSON_CONTENT, failureContent);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+    }
+
+    @Test
+    public void testOnlyErrors() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
+        runner.assertValid();
+
+        runner.enqueue(streamFor(FAILURE_CONTENT));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 0 rows", 0, converted);
+        Assert.assertEquals("Should reject 1 row", 3, errors);
+
+        runner.assertTransferCount("success", 0);
         runner.assertTransferCount("failure", 1);
+        runner.assertTransferCount("incompatible", 0);
+
+        MockFlowFile incompatible = 
runner.getFlowFilesForRelationship("failure").get(0);
+        Assert.assertEquals("Should set an error message",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+    }
+
+    @Test
+    public void testEmptyContent() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
+        runner.assertValid();
 
-        String failureContent = Bytes.toString(runner.getContentAsByteArray(
-                runner.getFlowFilesForRelationship("failure").get(0)));
-        Assert.assertEquals("Should reject an invalid string and double", 
FAILURE_CONTENT, failureContent);
+        runner.enqueue(streamFor(""));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 0 rows", 0, converted);
+        Assert.assertEquals("Should reject 0 row", 0, errors);
+
+        runner.assertTransferCount("success", 0);
+        runner.assertTransferCount("failure", 1);
+        runner.assertTransferCount("incompatible", 0);
+
+        MockFlowFile incompatible = 
runner.getFlowFilesForRelationship("failure").get(0);
+        Assert.assertEquals("Should set an error message",
+                "No incoming records", incompatible.getAttribute("errors"));
+    }
+
+    @Test
+    public void testBasicConversionNoErrors() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
+        runner.assertValid();
+
+        runner.enqueue(streamFor(
+                "{\"id\": 1,\"color\": \"green\"}\n" +
+                        "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 
}"));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 0 row", 0, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+        runner.assertTransferCount("incompatible", 0);
     }
 }

Reply via email to