This is an automated email from the ASF dual-hosted git repository. mattyb149 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new 24a7d48 NIFI-5983: handling parse problems in recordReader implementations 24a7d48 is described below commit 24a7d480c8fde18a7dd64d7de80812d18eb2c5a4 Author: Endre Zoltan Kovacs <ekov...@hortonworks.com> AuthorDate: Thu Jan 31 14:47:21 2019 +0100 NIFI-5983: handling parse problems in recordReader implementations Fixed Checkstyle violation Signed-off-by: Matthew Burgess <mattyb...@apache.org> This closes #3282 --- .../nifi-record-serialization-services/pom.xml | 5 ++ .../org/apache/nifi/avro/AvroRecordReader.java | 25 +++++--- .../java/org/apache/nifi/csv/CSVRecordReader.java | 68 ++++++++++++---------- .../apache/nifi/csv/ITApacheCSVRecordReader.java | 35 ++++++++++- 4 files changed, 93 insertions(+), 40 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 140a74b..b16464e 100755 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -119,6 +119,11 @@ <artifactId>caffeine</artifactId> <version>2.6.2</version> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>27.0.1-jre</version> + </dependency> </dependencies> <build> <plugins> diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java index c1d87b6..c9a624f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java @@ -24,6 +24,8 @@ import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import com.google.common.base.Throwables; + import java.io.IOException; import java.util.Map; @@ -33,14 +35,21 @@ public abstract class AvroRecordReader implements RecordReader { @Override public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { - GenericRecord record = nextAvroRecord(); - if (record == null) { - return null; + try { + GenericRecord record = nextAvroRecord(); + if (record == null) { + return null; + } + + final RecordSchema schema = getSchema(); + final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(record, schema); + return new MapRecord(schema, values); + } catch (IOException e) { + throw e; + } catch (MalformedRecordException e) { + throw e; + } catch (Exception e) { + throw new MalformedRecordException("Error while getting next record. Root cause: " + Throwables.getRootCause(e), e); } - - final RecordSchema schema = getSchema(); - final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(record, schema); - return new MapRecord(schema, values); } - } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java index 299ad05..22a2e8a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java @@ -42,6 +42,8 @@ import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import com.google.common.base.Throwables; + public class CSVRecordReader extends AbstractCSVRecordReader { private final CSVParser csvParser; @@ -72,45 +74,49 @@ public class CSVRecordReader extends AbstractCSVRecordReader { @Override public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException { - final RecordSchema schema = getSchema(); - - final List<RecordField> recordFields = getRecordFields(); - final int numFieldNames = recordFields.size(); - - for (final CSVRecord csvRecord : csvParser) { - final Map<String, Object> values = new LinkedHashMap<>(recordFields.size() * 2); - for (int i = 0; i < csvRecord.size(); i++) { - final String rawValue = csvRecord.get(i); - final String rawFieldName; - final DataType dataType; - if (i >= numFieldNames) { - if (!dropUnknownFields) { - values.put("unknown_field_index_" + i, rawValue); + try { + final RecordSchema schema = getSchema(); + + final List<RecordField> recordFields = getRecordFields(); + final int numFieldNames = recordFields.size(); + for (final CSVRecord csvRecord : csvParser) { + final Map<String, Object> values = new LinkedHashMap<>(recordFields.size() * 2); + for (int i = 0; i < csvRecord.size(); i++) { + final String rawValue = csvRecord.get(i); + + final String rawFieldName; + final DataType dataType; + if (i >= numFieldNames) { + if (!dropUnknownFields) { + values.put("unknown_field_index_" + i, rawValue); + } + + continue; + } else { + final RecordField recordField = recordFields.get(i); + rawFieldName = recordField.getFieldName(); + dataType = recordField.getDataType(); } - continue; - } else { - final RecordField recordField = recordFields.get(i); - rawFieldName = recordField.getFieldName(); - dataType = recordField.getDataType(); - } + final Object value; + if (coerceTypes) { + value = convert(rawValue, dataType, rawFieldName); + } else { + // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to + // dictate a field type. As a result, we will use the schema that we have to attempt to convert + // the value into the desired type if it's a simple type. + value = convertSimpleIfPossible(rawValue, dataType, rawFieldName); + } - final Object value; - if (coerceTypes) { - value = convert(rawValue, dataType, rawFieldName); - } else { - // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to - // dictate a field type. As a result, we will use the schema that we have to attempt to convert - // the value into the desired type if it's a simple type. - value = convertSimpleIfPossible(rawValue, dataType, rawFieldName); + values.put(rawFieldName, value); } - values.put(rawFieldName, value); + return new MapRecord(schema, values, coerceTypes, dropUnknownFields); } - - return new MapRecord(schema, values, coerceTypes, dropUnknownFields); + } catch (Exception e) { + throw new MalformedRecordException("Error while getting next record. Root cause: " + Throwables.getRootCause(e), e); } return null; diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java index 30c05c0..17649cd 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/ITApacheCSVRecordReader.java @@ -17,6 +17,7 @@ package org.apache.nifi.csv; import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.QuoteMode; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.SimpleRecordSchema; @@ -27,22 +28,30 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.junit.Test; import org.mockito.Mockito; +import com.google.common.base.Throwables; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; public class ITApacheCSVRecordReader { private final CSVFormat format = CSVFormat.DEFAULT.withFirstRecordAsHeader().withTrim().withQuote('"'); private List<RecordField> getDefaultFields() { + return createStringFields(new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}); + } + + private List<RecordField> createStringFields(String[] fieldNames) { final List<RecordField> fields = new ArrayList<>(); - for (final String fieldName : new String[]{"id", "name", "balance", "address", "city", "state", "zipCode", "country"}) { + for (final String fieldName : fieldNames) { fields.add(new RecordField(fieldName, RecordFieldType.STRING.getDataType())); } return fields; @@ -71,4 +80,28 @@ public class ITApacheCSVRecordReader { assertEquals(NUM_LINES, numRecords); } } + + @Test + public void testExceptionThrownOnParseProblem() throws IOException, MalformedRecordException { + CSVFormat csvFormat = CSVFormat.DEFAULT.withFirstRecordAsHeader().withQuoteMode(QuoteMode.ALL).withTrim().withDelimiter(','); + final int NUM_LINES = 25; + StringBuilder sb = new StringBuilder("\"id\",\"name\",\"balance\""); + for (int i = 0; i < NUM_LINES; i++) { + sb.append(String.format("\"%s\",\"John Doe\",\"4750.89D\"\n", i)); + } + // cause a parse problem + sb.append(String.format("\"%s\"dieParser,\"John Doe\",\"4750.89D\"\n", NUM_LINES )); + sb.append(String.format("\"%s\",\"John Doe\",\"4750.89D\"\n", NUM_LINES + 1)); + final RecordSchema schema = new SimpleRecordSchema(createStringFields(new String[] {"id", "name", "balance"})); + + try (final InputStream bais = new ByteArrayInputStream(sb.toString().getBytes()); + final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, csvFormat, true, false, + RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) { + + while (reader.nextRecord() != null) {} + } catch (Exception e) { + assertThat(e, instanceOf(MalformedRecordException.class)); + assertThat(Throwables.getRootCause(e), instanceOf(IOException.class)); + } + } }