NIFI-738: Improve error handling for CSV and JSON conversion. Changes: * Send bad record information on the "incompatible" relationship * Use an attribute, "errors" for summarized error messages * Send no error content for now, original content can't be accessed * Summarize similar error messages with "N similar failures" * Add similar error handling to CSV conversion
Signed-off-by: joewitt <joew...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2e32e0e2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2e32e0e2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2e32e0e2 Branch: refs/heads/master Commit: 2e32e0e2b82e071734919589ed680513edfd39ed Parents: 4dc2ea6 Author: Ryan Blue <b...@apache.org> Authored: Sat Jun 27 17:36:42 2015 -0700 Committer: joewitt <joew...@apache.org> Committed: Mon Jun 29 22:00:22 2015 -0400 ---------------------------------------------------------------------- .../nifi/processors/kite/ConvertCSVToAvro.java | 76 ++++++++++---- .../nifi/processors/kite/ConvertJSONToAvro.java | 95 ++++++++++------- .../nifi/processors/kite/FailureTracker.java | 83 +++++++++++++++ .../processors/kite/TestCSVToAvroProcessor.java | 96 +++++++++++++++++- .../kite/TestJSONToAvroProcessor.java | 101 +++++++++++++++++-- 5 files changed, 384 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java index 564a203..6c20a8f 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@ -44,6 +44,7 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.StreamCallback; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.LongHolder; import org.kitesdk.data.DatasetException; import org.kitesdk.data.DatasetIOException; import org.kitesdk.data.DatasetRecordException; @@ -76,12 +77,17 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor { private static final Relationship SUCCESS = new Relationship.Builder() .name("success") - .description("FlowFile content has been successfully saved") + .description("Avro content that was converted successfully from CSV") .build(); private static final Relationship FAILURE = new Relationship.Builder() .name("failure") - .description("FlowFile content could not be processed") + .description("CSV content that could not be processed") + .build(); + + private static final Relationship INCOMPATIBLE = new Relationship.Builder() + .name("incompatible") + .description("CSV content that could not be converted") .build(); @VisibleForTesting @@ -164,6 +170,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor { = ImmutableSet.<Relationship>builder() .add(SUCCESS) .add(FAILURE) + .add(INCOMPATIBLE) .build(); // Immutable configuration @@ -197,20 +204,20 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor { @Override public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { + FlowFile incomingCSV = session.get(); + if (incomingCSV == null) { return; } String schemaProperty = context.getProperty(SCHEMA) - .evaluateAttributeExpressions(flowFile) + .evaluateAttributeExpressions(incomingCSV) .getValue(); final Schema schema; try { schema = getSchema(schemaProperty, DefaultConfiguration.get()); } catch (SchemaNotFoundException e) { getLogger().error("Cannot find schema: " + schemaProperty); - session.transfer(flowFile, FAILURE); + session.transfer(incomingCSV, FAILURE); return; } @@ -219,11 +226,13 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor { writer.setCodec(CodecFactory.snappyCodec()); try { - flowFile = session.write(flowFile, new StreamCallback() { + final LongHolder written = new LongHolder(0L); + final FailureTracker failures = new FailureTracker(); + + FlowFile badRecords = session.clone(incomingCSV); + FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() { @Override public void process(InputStream in, OutputStream out) throws IOException { - long written = 0L; - long errors = 0L; try (CSVFileReader<Record> reader = new CSVFileReader<>( in, props, schema, Record.class)) { reader.initialize(); @@ -232,29 +241,58 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor { try { Record record = reader.next(); w.append(record); - written += 1; + written.incrementAndGet(); } catch (DatasetRecordException e) { - errors += 1; + failures.add(e); } } } } - session.adjustCounter("Converted records", written, - false /* update only if file transfer is successful */); - session.adjustCounter("Conversion errors", errors, - false /* update only if file transfer is successful */); } }); - session.transfer(flowFile, SUCCESS); + long errors = failures.count(); + + session.adjustCounter("Converted records", written.get(), + false /* update only if file transfer is successful */); + session.adjustCounter("Conversion errors", errors, + false /* update only if file transfer is successful */); + + if (written.get() > 0L) { + session.transfer(outgoingAvro, SUCCESS); + + if (errors > 0L) { + getLogger().warn("Failed to convert {}/{} records from CSV to Avro", + new Object[] { errors, errors + written.get() }); + badRecords = session.putAttribute( + badRecords, "errors", failures.summary()); + session.transfer(badRecords, INCOMPATIBLE); + } else { + session.remove(badRecords); + } + + } else { + session.remove(outgoingAvro); + + if (errors > 0L) { + getLogger().warn("Failed to convert {}/{} records from CSV to Avro", + new Object[] { errors, errors }); + badRecords = session.putAttribute( + badRecords, "errors", failures.summary()); + } else { + badRecords = session.putAttribute( + badRecords, "errors", "No incoming records"); + } + + session.transfer(badRecords, FAILURE); + } - //session.getProvenanceReporter().send(flowFile, target.getUri().toString()); } catch (ProcessException | DatasetIOException e) { getLogger().error("Failed reading or writing", e); - session.transfer(flowFile, FAILURE); + session.transfer(incomingCSV, FAILURE); } catch (DatasetException e) { getLogger().error("Failed to read FlowFile", e); - session.transfer(flowFile, FAILURE); + session.transfer(incomingCSV, FAILURE); } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java index 7969a8f..ec1503c 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java @@ -38,8 +38,8 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.util.LongHolder; import org.kitesdk.data.DatasetException; import org.kitesdk.data.DatasetIOException; import org.kitesdk.data.DatasetRecordException; @@ -54,12 +54,17 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { private static final Relationship SUCCESS = new Relationship.Builder() .name("success") - .description("FlowFile content has been successfully saved") + .description("Avro content that was converted successfully from JSON") .build(); private static final Relationship FAILURE = new Relationship.Builder() .name("failure") - .description("FlowFile content could not be processed") + .description("JSON content that could not be processed") + .build(); + + private static final Relationship INCOMPATIBLE = new Relationship.Builder() + .name("incompatible") + .description("JSON content that could not be converted") .build(); @VisibleForTesting @@ -82,6 +87,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { = ImmutableSet.<Relationship>builder() .add(SUCCESS) .add(FAILURE) + .add(INCOMPATIBLE) .build(); public ConvertJSONToAvro() { @@ -100,20 +106,20 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile successfulRecords = session.get(); - if (successfulRecords == null) { + FlowFile incomingJSON = session.get(); + if (incomingJSON == null) { return; } String schemaProperty = context.getProperty(SCHEMA) - .evaluateAttributeExpressions(successfulRecords) + .evaluateAttributeExpressions(incomingJSON) .getValue(); final Schema schema; try { schema = getSchema(schemaProperty, DefaultConfiguration.get()); } catch (SchemaNotFoundException e) { getLogger().error("Cannot find schema: " + schemaProperty); - session.transfer(successfulRecords, FAILURE); + session.transfer(incomingJSON, FAILURE); return; } @@ -122,59 +128,74 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor { writer.setCodec(CodecFactory.snappyCodec()); try { - successfulRecords = session.write(successfulRecords, new StreamCallback() { + final LongHolder written = new LongHolder(0L); + final FailureTracker failures = new FailureTracker(); + + FlowFile badRecords = session.clone(incomingJSON); + FlowFile outgoingAvro = session.write(incomingJSON, new StreamCallback() { @Override public void process(InputStream in, OutputStream out) throws IOException { - FlowFile failedRecords = session.create(); - long written = 0L; - long errors = 0L; - long total = 0L; try (JSONFileReader<Record> reader = new JSONFileReader<>( in, schema, Record.class)) { reader.initialize(); try (DataFileWriter<Record> w = writer.create(schema, out)) { while (reader.hasNext()) { - total += 1; try { Record record = reader.next(); w.append(record); - written += 1; + written.incrementAndGet(); + } catch (final DatasetRecordException e) { - failedRecords = session.append(failedRecords, new OutputStreamCallback() { - @Override - public void process(OutputStream out) throws IOException { - out.write((e.getMessage() + " [" - + e.getCause().getMessage() + "]\n").getBytes()); - } - }); - errors += 1; + failures.add(e); } } } - session.adjustCounter("Converted records", written, - false /* update only if file transfer is successful */); - session.adjustCounter("Conversion errors", errors, - false /* update only if file transfer is successful */); - - if (errors > 0L) { - getLogger().warn("Failed to convert " + errors + '/' + total + " records from JSON to Avro"); - } } - session.transfer(failedRecords, FAILURE); } }); - session.transfer(successfulRecords, SUCCESS); + long errors = failures.count(); + + session.adjustCounter("Converted records", written.get(), + false /* update only if file transfer is successful */); + session.adjustCounter("Conversion errors", errors, + false /* update only if file transfer is successful */); + + if (written.get() > 0L) { + session.transfer(outgoingAvro, SUCCESS); + + if (errors > 0L) { + getLogger().warn("Failed to convert {}/{} records from JSON to Avro", + new Object[] { errors, errors + written.get() }); + badRecords = session.putAttribute( + badRecords, "errors", failures.summary()); + session.transfer(badRecords, INCOMPATIBLE); + } else { + session.remove(badRecords); + } + + } else { + session.remove(outgoingAvro); + + if (errors > 0L) { + getLogger().warn("Failed to convert {}/{} records from JSON to Avro", + new Object[] { errors, errors }); + badRecords = session.putAttribute( + badRecords, "errors", failures.summary()); + } else { + badRecords = session.putAttribute( + badRecords, "errors", "No incoming records"); + } + + session.transfer(badRecords, FAILURE); + } - //session.getProvenanceReporter().send(flowFile, target.getUri().toString()); } catch (ProcessException | DatasetIOException e) { getLogger().error("Failed reading or writing", e); - session.transfer(successfulRecords, FAILURE); - + session.transfer(incomingJSON, FAILURE); } catch (DatasetException e) { getLogger().error("Failed to read FlowFile", e); - session.transfer(successfulRecords, FAILURE); - + session.transfer(incomingJSON, FAILURE); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java new file mode 100644 index 0000000..de86761 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/FailureTracker.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.nifi.processors.kite; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; +import java.util.Map; + +class FailureTracker { + private static final Splitter REASON_SEPARATOR = Splitter.on(':').limit(2); + + private final Map<String, String> examples = Maps.newLinkedHashMap(); + private final Map<String, Integer> occurrences = Maps.newLinkedHashMap(); + long count = 0L; + + public void add(Throwable throwable) { + add(reason(throwable)); + } + + public void add(String reason) { + count += 1; + String problem = Iterators.getNext(REASON_SEPARATOR.split(reason).iterator(), "Unknown"); + if (examples.containsKey(problem)) { + occurrences.put(problem, occurrences.get(problem) + 1); + } else { + examples.put(problem, reason); + occurrences.put(problem, 1); + } + } + + public long count() { + return count; + } + + public String summary() { + boolean first = true; + StringBuilder sb = new StringBuilder(); + for (String problem : examples.keySet()) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(examples.get(problem)); + int similar = occurrences.get(problem) - 1; + if (similar == 1) { + sb.append(" (1 similar failure)"); + } else if (similar > 1) { + sb.append(" (").append(similar).append(" similar failures)"); + } + } + return sb.toString(); + } + + private static String reason(Throwable t) { + StringBuilder sb = new StringBuilder(); + for (Throwable current = t; current != null; current = current.getCause()) { + if (current != t) { + sb.append(": "); + } + sb.append(current.getMessage()); + } + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java index 753b72b..43dea6e 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java @@ -20,9 +20,11 @@ package org.apache.nifi.processors.kite; import java.io.IOException; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -43,6 +45,12 @@ public class TestCSVToAvroProcessor { + ",blue,\n" + // invalid, ID is missing "2,grey,12.95"; + public static final String FAILURE_CONTENT = "" + + ",blue,\n"; // invalid, ID is missing + + public static final String FAILURE_SUMMARY = "" + + "Field id: cannot make \"long\" value: '': Field id type:LONG pos:0 not set and has no default value"; + @Test public void testBasicConversion() throws IOException { TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); @@ -58,7 +66,17 @@ public class TestCSVToAvroProcessor { Assert.assertEquals("Should convert 2 rows", 2, converted); Assert.assertEquals("Should reject 1 row", 1, errors); - runner.assertAllFlowFilesTransferred("success", 1); + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0); + String failureContent = new String(runner.getContentAsByteArray(incompatible), + StandardCharsets.UTF_8); + Assert.assertEquals("Should reject an invalid string and double", + CSV_CONTENT, failureContent); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); } @Test @@ -76,7 +94,61 @@ public class TestCSVToAvroProcessor { Assert.assertEquals("Should convert 2 rows", 2, converted); Assert.assertEquals("Should reject 1 row", 1, errors); - runner.assertAllFlowFilesTransferred("success", 1); + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + } + + @Test + public void testOnlyErrors() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); + + runner.enqueue(streamFor(FAILURE_CONTENT)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 0 rows", 0, converted); + Assert.assertEquals("Should reject 1 row", 1, errors); + + runner.assertTransferCount("success", 0); + runner.assertTransferCount("failure", 1); + runner.assertTransferCount("incompatible", 0); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("failure").get(0); + Assert.assertEquals("Should set an error message", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + } + + @Test + public void testEmptyContent() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); + + runner.enqueue(streamFor("")); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 0 rows", 0, converted); + Assert.assertEquals("Should reject 0 row", 0, errors); + + runner.assertTransferCount("success", 0); + runner.assertTransferCount("failure", 1); + runner.assertTransferCount("incompatible", 0); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("failure").get(0); + Assert.assertEquals("Should set an error message", + "No incoming records", incompatible.getAttribute("errors")); } @Test @@ -122,4 +194,24 @@ public class TestCSVToAvroProcessor { Assert.assertEquals("Lines to skip should match", 2, processor.props.linesToSkip); } + + @Test + public void testBasicConversionNoErrors() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); + + runner.enqueue(streamFor("1,green\n2,blue,\n3,grey,12.95")); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 3 rows", 3, converted); + Assert.assertEquals("Should reject 0 row", 0, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 0); + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2e32e0e2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java index 704d998..e0b4a6f 100644 --- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java +++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java @@ -19,10 +19,11 @@ package org.apache.nifi.processors.kite; import java.io.IOException; +import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; -import org.apache.hadoop.hbase.util.Bytes; +import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; import org.junit.Assert; @@ -41,11 +42,18 @@ public class TestJSONToAvroProcessor { public static final String JSON_CONTENT = "" + "{\"id\": 1,\"color\": \"green\"}" + "{\"id\": \"120V\", \"color\": \"blue\"}\n" // invalid, ID is a string - + "{\"id\": 10, \"color\": 15.23}\n" + // invalid, color as double - "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }"; + + "{\"id\": 10, \"color\": 15.23}\n" // invalid, color as double + + "{\"id\": 20, \"color\": 34}\n" // invalid, color as int + + "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }"; - public static final String FAILURE_CONTENT = "Cannot convert field id [Cannot convert to long: \"120V\"]\n" - + "Cannot convert field color [Cannot convert to string: 15.23]\n"; + public static final String FAILURE_CONTENT = "" + + "{\"id\": \"120V\", \"color\": \"blue\"}\n" + + "{\"id\": 10, \"color\": 15.23}\n" + + "{\"id\": 20, \"color\": 34}\n"; + + public static final String FAILURE_SUMMARY = "" + + "Cannot convert field id: Cannot convert to long: \"120V\", " + + "Cannot convert field color: Cannot convert to string: 15.23 (1 similar failure)"; @Test public void testBasicConversion() throws IOException { @@ -60,13 +68,88 @@ public class TestJSONToAvroProcessor { long converted = runner.getCounterValue("Converted records"); long errors = runner.getCounterValue("Conversion errors"); Assert.assertEquals("Should convert 2 rows", 2, converted); - Assert.assertEquals("Should reject 2 rows", 2, errors); + Assert.assertEquals("Should reject 3 rows", 3, errors); runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 1); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0); + String failureContent = new String(runner.getContentAsByteArray(incompatible), + StandardCharsets.UTF_8); + Assert.assertEquals("Should reject an invalid string and double", + JSON_CONTENT, failureContent); + Assert.assertEquals("Should accumulate error messages", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + } + + @Test + public void testOnlyErrors() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); + + runner.enqueue(streamFor(FAILURE_CONTENT)); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 0 rows", 0, converted); + Assert.assertEquals("Should reject 1 row", 3, errors); + + runner.assertTransferCount("success", 0); runner.assertTransferCount("failure", 1); + runner.assertTransferCount("incompatible", 0); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("failure").get(0); + Assert.assertEquals("Should set an error message", + FAILURE_SUMMARY, incompatible.getAttribute("errors")); + } + + @Test + public void testEmptyContent() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); - String failureContent = Bytes.toString(runner.getContentAsByteArray( - runner.getFlowFilesForRelationship("failure").get(0))); - Assert.assertEquals("Should reject an invalid string and double", FAILURE_CONTENT, failureContent); + runner.enqueue(streamFor("")); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 0 rows", 0, converted); + Assert.assertEquals("Should reject 0 row", 0, errors); + + runner.assertTransferCount("success", 0); + runner.assertTransferCount("failure", 1); + runner.assertTransferCount("incompatible", 0); + + MockFlowFile incompatible = runner.getFlowFilesForRelationship("failure").get(0); + Assert.assertEquals("Should set an error message", + "No incoming records", incompatible.getAttribute("errors")); + } + + @Test + public void testBasicConversionNoErrors() throws IOException { + TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class); + runner.assertNotValid(); + runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString()); + runner.assertValid(); + + runner.enqueue(streamFor( + "{\"id\": 1,\"color\": \"green\"}\n" + + "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }")); + runner.run(); + + long converted = runner.getCounterValue("Converted records"); + long errors = runner.getCounterValue("Conversion errors"); + Assert.assertEquals("Should convert 2 rows", 2, converted); + Assert.assertEquals("Should reject 0 row", 0, errors); + + runner.assertTransferCount("success", 1); + runner.assertTransferCount("failure", 0); + runner.assertTransferCount("incompatible", 0); } }