http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE index 1848020..f5f0bb1 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE @@ -64,11 +64,14 @@ The following binary components are provided under the Apache Software License v The following NOTICE information applies: Copyright 2011 JSON-SMART authors - (ASLv2) JsonPath - The following NOTICE information applies: - Copyright 2011 JsonPath authors + (ASLv2) JsonPath + The following NOTICE information applies: + Copyright 2011 JsonPath authors - (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3) + (ASLv2) Apache Commons CSV + The following NOTICE information applies: + Apache Commons CSV + Copyright 2005-2016 The Apache Software Foundation (ASLv2) Apache Avro The following NOTICE information applies:
http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore deleted file mode 100644 index ae3c172..0000000 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore +++ /dev/null @@ -1 +0,0 @@ -/bin/ http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml index 9b2a56c..d86a8c5 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml @@ -34,6 +34,10 @@ <artifactId>nifi-record-serialization-service-api</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + </dependency> + <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> </dependency> @@ -50,9 +54,13 @@ <artifactId>commons-lang3</artifactId> </dependency> <dependency> - <groupId>net.sf.opencsv</groupId> - <artifactId>opencsv</artifactId> - <version>2.3</version> + <groupId>org.apache.commons</groupId> + <artifactId>commons-csv</artifactId> + <version>1.4</version> + </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> </dependency> <dependency> <groupId>io.thekraken</groupId> @@ -62,6 +70,7 @@ <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> + <version>1.8.1</version> </dependency> </dependencies> @@ -72,6 +81,8 @@ <artifactId>apache-rat-plugin</artifactId> <configuration> <excludes combine.children="append"> + <exclude>src/test/resources/avro/datatypes.avsc</exclude> + <exclude>src/test/resources/avro/logical-types.avsc</exclude> <exclude>src/test/resources/csv/extra-white-space.csv</exclude> <exclude>src/test/resources/csv/multi-bank-account.csv</exclude> <exclude>src/test/resources/csv/single-bank-account.csv</exclude> @@ -80,12 +91,14 @@ <exclude>src/test/resources/grok/nifi-log-sample.log</exclude> <exclude>src/test/resources/grok/single-line-log-messages.txt</exclude> <exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude> + <exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude> <exclude>src/test/resources/json/bank-account-array.json</exclude> <exclude>src/test/resources/json/json-with-unicode.json</exclude> <exclude>src/test/resources/json/primitive-type-array.json</exclude> <exclude>src/test/resources/json/single-bank-account.json</exclude> <exclude>src/test/resources/json/single-element-nested-array.json</exclude> <exclude>src/test/resources/json/single-element-nested.json</exclude> + <exclude>src/test/resources/json/output/dataTypes.json</exclude> </excludes> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index fc0c598..f92816f 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -23,18 +23,27 @@ import java.io.InputStream; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.record.RecordSchema; @Tags({"avro", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"}) -@CapabilityDescription("Parses Avro data and returns each Avro record as an separate record.") +@CapabilityDescription("Parses Avro data and returns each Avro record as an separate Record object. The Avro data must contain " + + "the schema itself.") public class AvroReader extends AbstractControllerService implements RowRecordReaderFactory { @Override - public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException { + public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException { return new AvroRecordReader(in); } + @Override + public RecordSchema getSchema(final FlowFile flowFile) throws MalformedRecordException, IOException { + // TODO: Need to support retrieving schema from registry instead of requiring that it be in the Avro file. + return null; + } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java index e98a5ad..d725cbf 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java @@ -24,11 +24,12 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; +import java.util.concurrent.TimeUnit; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; -import org.apache.avro.Schema.Type; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericData.Array; @@ -44,8 +45,8 @@ import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; public class AvroRecordReader implements RecordReader { private final InputStream in; @@ -53,6 +54,7 @@ public class AvroRecordReader implements RecordReader { private final DataFileStream<GenericRecord> dataFileStream; private RecordSchema recordSchema; + public AvroRecordReader(final InputStream in) throws IOException, MalformedRecordException { this.in = in; @@ -79,65 +81,78 @@ public class AvroRecordReader implements RecordReader { } final RecordSchema schema = getSchema(); - final Map<String, Object> values = convertRecordToObjectArray(record, schema); + final Map<String, Object> values = convertAvroRecordToMap(record, schema); return new MapRecord(schema, values); } - private Map<String, Object> convertRecordToObjectArray(final GenericRecord record, final RecordSchema schema) { - final Map<String, Object> values = new HashMap<>(schema.getFieldCount()); + private Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) { + final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount()); - for (final String fieldName : schema.getFieldNames()) { - final Object value = record.get(fieldName); + for (final String fieldName : recordSchema.getFieldNames()) { + final Object value = avroRecord.get(fieldName); - final Field avroField = record.getSchema().getField(fieldName); + final Field avroField = avroRecord.getSchema().getField(fieldName); if (avroField == null) { values.put(fieldName, null); continue; } final Schema fieldSchema = avroField.schema(); - final DataType dataType = schema.getDataType(fieldName).orElse(null); - final Object converted = convertValue(value, fieldSchema, avroField.name(), dataType); - values.put(fieldName, converted); - } + final Object rawValue = normalizeValue(value, fieldSchema); - return values; - } - - - @Override - public RecordSchema getSchema() throws MalformedRecordException { - if (recordSchema != null) { - return recordSchema; - } - - recordSchema = createSchema(avroSchema); - return recordSchema; - } + final DataType desiredType = recordSchema.getDataType(fieldName).get(); + final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType); - private RecordSchema createSchema(final Schema avroSchema) { - final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); - for (final Field field : avroSchema.getFields()) { - final String fieldName = field.name(); - final DataType dataType = determineDataType(field.schema()); - recordFields.add(new RecordField(fieldName, dataType)); + values.put(fieldName, coercedValue); } - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); - return recordSchema; + return values; } - private Object convertValue(final Object value, final Schema avroSchema, final String fieldName, final DataType desiredType) { + private Object normalizeValue(final Object value, final Schema avroSchema) { if (value == null) { return null; } switch (avroSchema.getType()) { + case INT: { + final LogicalType logicalType = avroSchema.getLogicalType(); + if (logicalType == null) { + return value; + } + + final String logicalName = logicalType.getName(); + if (LogicalTypes.date().getName().equals(logicalName)) { + // date logical name means that the value is number of days since Jan 1, 1970 + return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value)); + } else if (LogicalTypes.timeMillis().equals(logicalName)) { + // time-millis logical name means that the value is number of milliseconds since midnight. + return new java.sql.Time((int) value); + } + + break; + } + case LONG: { + final LogicalType logicalType = avroSchema.getLogicalType(); + if (logicalType == null) { + return value; + } + + final String logicalName = logicalType.getName(); + if (LogicalTypes.timeMicros().getName().equals(logicalName)) { + return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value)); + } else if (LogicalTypes.timestampMillis().getName().equals(logicalName)) { + return new java.sql.Timestamp((long) value); + } else if (LogicalTypes.timestampMicros().getName().equals(logicalName)) { + return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value)); + } + break; + } case UNION: if (value instanceof GenericData.Record) { - final GenericData.Record record = (GenericData.Record) value; - return convertValue(value, record.getSchema(), fieldName, desiredType); + final GenericData.Record avroRecord = (GenericData.Record) value; + return normalizeValue(value, avroRecord.getSchema()); } break; case RECORD: @@ -146,19 +161,18 @@ public class AvroRecordReader implements RecordReader { final List<Field> recordFields = recordSchema.getFields(); final Map<String, Object> values = new HashMap<>(recordFields.size()); for (final Field field : recordFields) { - final DataType desiredFieldType = determineDataType(field.schema()); final Object avroFieldValue = record.get(field.name()); - final Object fieldValue = convertValue(avroFieldValue, field.schema(), field.name(), desiredFieldType); + final Object fieldValue = normalizeValue(avroFieldValue, field.schema()); values.put(field.name(), fieldValue); } - final RecordSchema childSchema = createSchema(recordSchema); + final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema); return new MapRecord(childSchema, values); case BYTES: final ByteBuffer bb = (ByteBuffer) value; - return bb.array(); + return AvroTypeUtil.convertByteArray(bb.array()); case FIXED: final GenericFixed fixed = (GenericFixed) value; - return fixed.bytes(); + return AvroTypeUtil.convertByteArray(fixed.bytes()); case ENUM: return value.toString(); case NULL: @@ -170,7 +184,7 @@ public class AvroRecordReader implements RecordReader { final Object[] valueArray = new Object[array.size()]; for (int i = 0; i < array.size(); i++) { final Schema elementSchema = avroSchema.getElementType(); - valueArray[i] = convertValue(array.get(i), elementSchema, fieldName, determineDataType(elementSchema)); + valueArray[i] = normalizeValue(array.get(i), elementSchema); } return valueArray; case MAP: @@ -182,73 +196,32 @@ public class AvroRecordReader implements RecordReader { obj = obj.toString(); } - map.put(entry.getKey().toString(), obj); + final String key = entry.getKey().toString(); + obj = normalizeValue(obj, avroSchema.getValueType()); + + map.put(key, obj); + } + + final DataType elementType = AvroTypeUtil.determineDataType(avroSchema.getValueType()); + final List<RecordField> mapFields = new ArrayList<>(); + for (final String key : map.keySet()) { + mapFields.add(new RecordField(key, elementType)); } - return map; + final RecordSchema mapSchema = new SimpleRecordSchema(mapFields); + return new MapRecord(mapSchema, map); } return value; } - private DataType determineDataType(final Schema avroSchema) { - final Type avroType = avroSchema.getType(); - - switch (avroType) { - case ARRAY: - case BYTES: - case FIXED: - return RecordFieldType.ARRAY.getDataType(); - case BOOLEAN: - return RecordFieldType.BOOLEAN.getDataType(); - case DOUBLE: - return RecordFieldType.DOUBLE.getDataType(); - case ENUM: - case STRING: - return RecordFieldType.STRING.getDataType(); - case FLOAT: - return RecordFieldType.FLOAT.getDataType(); - case INT: - return RecordFieldType.INT.getDataType(); - case LONG: - return RecordFieldType.LONG.getDataType(); - case RECORD: { - final List<Field> avroFields = avroSchema.getFields(); - final List<RecordField> recordFields = new ArrayList<>(avroFields.size()); - - for (final Field field : avroFields) { - final String fieldName = field.name(); - final Schema fieldSchema = field.schema(); - final DataType fieldType = determineDataType(fieldSchema); - recordFields.add(new RecordField(fieldName, fieldType)); - } - - final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); - return RecordFieldType.RECORD.getDataType(recordSchema); - } - case NULL: - case MAP: - return RecordFieldType.RECORD.getDataType(); - case UNION: { - final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream() - .filter(s -> s.getType() != Type.NULL) - .collect(Collectors.toList()); - - if (nonNullSubSchemas.size() == 1) { - return determineDataType(nonNullSubSchemas.get(0)); - } - - final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); - for (final Schema subSchema : nonNullSubSchemas) { - final DataType childDataType = determineDataType(subSchema); - possibleChildTypes.add(childDataType); - } - - return RecordFieldType.CHOICE.getDataType(possibleChildTypes); - } + @Override + public RecordSchema getSchema() throws MalformedRecordException { + if (recordSchema != null) { + return recordSchema; } - return null; + recordSchema = AvroTypeUtil.createSchema(avroSchema); + return recordSchema; } - } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java index d56c716..03d766c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java @@ -25,18 +25,15 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.serialization.AbstractRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; -@Tags({"avro", "result", "set", "writer", "serializer", "record", "row"}) -@CapabilityDescription("Writes the contents of a Database ResultSet in Binary Avro format. The data types in the Result Set must match those " - + "specified by the Avro Schema. No type coercion will occur, with the exception of Date, Time, and Timestamps fields because Avro does not provide " - + "support for these types specifically. As a result, they will be converted to String fields using the configured formats. In addition, the label" - + "of the column must be a valid Avro field name.") -public class AvroRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory { +@Tags({"avro", "result", "set", "writer", "serializer", "record", "recordset", "row"}) +@CapabilityDescription("Writes the contents of a RecordSet in Binary Avro format.") +public class AvroRecordSetWriter extends AbstractControllerService implements RecordSetWriterFactory { static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() .name("Avro Schema") .description("The Avro Schema to use when writing out the Result Set") @@ -49,7 +46,7 @@ public class AvroRecordSetWriter extends AbstractRecordSetWriter implements Reco @Override protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + final List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(SCHEMA); return properties; } @@ -61,7 +58,7 @@ public class AvroRecordSetWriter extends AbstractRecordSetWriter implements Reco @Override public RecordSetWriter createWriter(final ComponentLog logger) { - return new WriteAvroResult(schema, getDateFormat(), getTimeFormat(), getTimestampFormat()); + return new WriteAvroResult(schema); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java new file mode 100644 index 0000000..1810c83 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.avro; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; + +public class AvroTypeUtil { + + public static DataType determineDataType(final Schema avroSchema) { + final Type avroType = avroSchema.getType(); + + switch (avroType) { + case BYTES: + case FIXED: + return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); + case ARRAY: + final DataType elementType = determineDataType(avroSchema.getElementType()); + return RecordFieldType.ARRAY.getArrayDataType(elementType); + case BOOLEAN: + return RecordFieldType.BOOLEAN.getDataType(); + case DOUBLE: + return RecordFieldType.DOUBLE.getDataType(); + case ENUM: + case STRING: + return RecordFieldType.STRING.getDataType(); + case FLOAT: + return RecordFieldType.FLOAT.getDataType(); + case INT: { + final LogicalType logicalType = avroSchema.getLogicalType(); + if (logicalType == null) { + return RecordFieldType.INT.getDataType(); + } + + if (LogicalTypes.date().getName().equals(logicalType.getName())) { + return RecordFieldType.DATE.getDataType(); + } else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) { + return RecordFieldType.TIME.getDataType(); + } + + return RecordFieldType.INT.getDataType(); + } + case LONG: { + final LogicalType logicalType = avroSchema.getLogicalType(); + if (logicalType == null) { + return RecordFieldType.LONG.getDataType(); + } + + if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) { + return RecordFieldType.TIMESTAMP.getDataType(); + } else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) { + return RecordFieldType.TIMESTAMP.getDataType(); + } else if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) { + return RecordFieldType.TIME.getDataType(); + } + + return RecordFieldType.LONG.getDataType(); + } + case RECORD: { + final List<Field> avroFields = avroSchema.getFields(); + final List<RecordField> recordFields = new ArrayList<>(avroFields.size()); + + for (final Field field : avroFields) { + final String fieldName = field.name(); + final Schema fieldSchema = field.schema(); + final DataType fieldType = determineDataType(fieldSchema); + recordFields.add(new RecordField(fieldName, fieldType)); + } + + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + return RecordFieldType.RECORD.getRecordDataType(recordSchema); + } + case NULL: + case MAP: + return RecordFieldType.RECORD.getDataType(); + case UNION: { + final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream() + .filter(s -> s.getType() != Type.NULL) + .collect(Collectors.toList()); + + if (nonNullSubSchemas.size() == 1) { + return determineDataType(nonNullSubSchemas.get(0)); + } + + final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); + for (final Schema subSchema : nonNullSubSchemas) { + final DataType childDataType = determineDataType(subSchema); + possibleChildTypes.add(childDataType); + } + + return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes); + } + } + + return null; + } + + public static RecordSchema createSchema(final Schema avroSchema) { + final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); + for (final Field field : avroSchema.getFields()) { + final String fieldName = field.name(); + final DataType dataType = AvroTypeUtil.determineDataType(field.schema()); + recordFields.add(new RecordField(fieldName, dataType)); + } + + final RecordSchema recordSchema = new SimpleRecordSchema(recordFields); + return recordSchema; + } + + public static Object[] convertByteArray(final byte[] bytes) { + final Object[] array = new Object[bytes.length]; + for (int i = 0; i < bytes.length; i++) { + array[i] = Byte.valueOf(bytes[i]); + } + return array; + } + + public static ByteBuffer convertByteArray(final Object[] bytes) { + final ByteBuffer bb = ByteBuffer.allocate(bytes.length); + for (final Object o : bytes) { + if (o instanceof Byte) { + bb.put(((Byte) o).byteValue()); + } else { + throw new IllegalTypeConversionException("Cannot convert value " + bytes + " of type " + bytes.getClass() + " to ByteBuffer"); + } + } + bb.flip(); + return bb; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java index d75d86d..b512b82 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java @@ -18,47 +18,41 @@ package org.apache.nifi.avro; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; -import java.math.BigDecimal; -import java.math.BigInteger; import java.nio.ByteBuffer; -import java.sql.Blob; -import java.sql.Clob; -import java.sql.SQLException; -import java.sql.Time; -import java.sql.Timestamp; -import java.text.DateFormat; -import java.text.SimpleDateFormat; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.EnumSymbol; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; public class WriteAvroResult implements RecordSetWriter { private final Schema schema; - private final DateFormat dateFormat; - private final DateFormat timeFormat; - private final DateFormat timestampFormat; - public WriteAvroResult(final Schema schema, final String dateFormat, final String timeFormat, final String timestampFormat) { + public WriteAvroResult(final Schema schema) { this.schema = schema; - this.dateFormat = new SimpleDateFormat(dateFormat); - this.timeFormat = new SimpleDateFormat(timeFormat); - this.timestampFormat = new SimpleDateFormat(timestampFormat); } @Override @@ -68,34 +62,13 @@ public class WriteAvroResult implements RecordSetWriter { return WriteResult.of(0, Collections.emptyMap()); } - final GenericRecord rec = new GenericData.Record(schema); - int nrOfRows = 0; final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { dataFileWriter.create(schema, outStream); - final RecordSchema recordSchema = rs.getSchema(); - do { - for (final String fieldName : recordSchema.getFieldNames()) { - final Object value = record.getValue(fieldName); - - final Field field = schema.getField(fieldName); - if (field == null) { - continue; - } - - final Object converted; - try { - converted = convert(value, field.schema(), fieldName); - } catch (final SQLException e) { - throw new IOException("Failed to write records to stream", e); - } - - rec.put(fieldName, converted); - } - + final GenericRecord rec = createAvroRecord(record, schema); dataFileWriter.append(rec); nrOfRows++; } while ((record = rs.next()) != null); @@ -104,169 +77,149 @@ public class WriteAvroResult implements RecordSetWriter { return WriteResult.of(nrOfRows, Collections.emptyMap()); } - @Override - public WriteResult write(final Record record, final OutputStream out) throws IOException { - final GenericRecord rec = new GenericData.Record(schema); + private GenericRecord createAvroRecord(final Record record, final Schema avroSchema) throws IOException { + final GenericRecord rec = new GenericData.Record(avroSchema); + final RecordSchema recordSchema = record.getSchema(); - final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); - try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { - dataFileWriter.create(schema, out); - final RecordSchema recordSchema = record.getSchema(); - - for (final String fieldName : recordSchema.getFieldNames()) { - final Object value = record.getValue(fieldName); - - final Field field = schema.getField(fieldName); - if (field == null) { - continue; - } + for (final String fieldName : recordSchema.getFieldNames()) { + final Object rawValue = record.getValue(fieldName); - final Object converted; - try { - converted = convert(value, field.schema(), fieldName); - } catch (final SQLException e) { - throw new IOException("Failed to write records to stream", e); - } - - rec.put(fieldName, converted); + final Field field = avroSchema.getField(fieldName); + if (field == null) { + continue; } - dataFileWriter.append(rec); + final Object converted = convertToAvroObject(rawValue, field.schema()); + rec.put(fieldName, converted); } - return WriteResult.of(1, Collections.emptyMap()); + return rec; } - - private Object convert(final Object value, final Schema schema, final String fieldName) throws SQLException, IOException { - if (value == null) { + private Object convertToAvroObject(final Object rawValue, final Schema fieldSchema) throws IOException { + if (rawValue == null) { return null; } - // Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement - if (value instanceof Clob) { - final Clob clob = (Clob) value; - - long numChars = clob.length(); - char[] buffer = new char[(int) numChars]; - InputStream is = clob.getAsciiStream(); - int index = 0; - int c = is.read(); - while (c > 0) { - buffer[index++] = (char) c; - c = is.read(); - } - - clob.free(); - return new String(buffer); - } + switch (fieldSchema.getType()) { + case INT: { + final LogicalType logicalType = fieldSchema.getLogicalType(); + if (logicalType == null) { + return DataTypeUtils.toInteger(rawValue); + } - if (value instanceof Blob) { - final Blob blob = (Blob) value; + if (LogicalTypes.date().getName().equals(logicalType.getName())) { + final long longValue = DataTypeUtils.toLong(rawValue); + final Date date = new Date(longValue); + final Duration duration = Duration.between(new Date(0L).toInstant(), date.toInstant()); + final long days = duration.toDays(); + return (int) days; + } else if (LogicalTypes.timeMillis().getName().equals(logicalType.getName())) { + final long longValue = DataTypeUtils.toLong(rawValue); + final Date date = new Date(longValue); + final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); + final long millisSinceMidnight = duration.toMillis(); + return (int) millisSinceMidnight; + } - final long numChars = blob.length(); - final byte[] buffer = new byte[(int) numChars]; - final InputStream is = blob.getBinaryStream(); - int index = 0; - int c = is.read(); - while (c > 0) { - buffer[index++] = (byte) c; - c = is.read(); + return DataTypeUtils.toInteger(rawValue); } + case LONG: { + final LogicalType logicalType = fieldSchema.getLogicalType(); + if (logicalType == null) { + return DataTypeUtils.toLong(rawValue); + } - final ByteBuffer bb = ByteBuffer.wrap(buffer); - blob.free(); - return bb; - } + if (LogicalTypes.timeMicros().getName().equals(logicalType.getName())) { + final long longValue = DataTypeUtils.toLong(rawValue); + final Date date = new Date(longValue); + final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); + return duration.toMillis() * 1000L; + } else if (LogicalTypes.timestampMillis().getName().equals(logicalType.getName())) { + return DataTypeUtils.toLong(rawValue); + } else if (LogicalTypes.timestampMicros().getName().equals(logicalType.getName())) { + return DataTypeUtils.toLong(rawValue) * 1000L; + } - if (value instanceof byte[]) { - // bytes requires little bit different handling - return ByteBuffer.wrap((byte[]) value); - } else if (value instanceof Byte) { - // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT - // But value is returned by JDBC as java.lang.Byte - // (at least H2 JDBC works this way) - // direct put to avro record results: - // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte - return ((Byte) value).intValue(); - } else if (value instanceof Short) { - //MS SQL returns TINYINT as a Java Short, which Avro doesn't understand. - return ((Short) value).intValue(); - } else if (value instanceof BigDecimal) { - // Avro can't handle BigDecimal as a number - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38" - return value.toString(); - } else if (value instanceof BigInteger) { - // Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that. - // It the SQL type is BIGINT and the precision is between 0 and 19 (inclusive); if so, the BigInteger is likely a - // long (and the schema says it will be), so try to get its value as a long. - // Otherwise, Avro can't handle BigInteger as a number - it will throw an AvroRuntimeException - // such as: "Unknown datum type: java.math.BigInteger: 38". In this case the schema is expecting a string. - final BigInteger bigInt = (BigInteger) value; - if (bigInt.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) { - return value.toString(); - } else { - return bigInt.longValue(); + return DataTypeUtils.toLong(rawValue); } - } else if (value instanceof Boolean) { - return value; - } else if (value instanceof Map) { - // TODO: Revisit how we handle a lot of these cases.... - switch (schema.getType()) { - case MAP: - return value; - case RECORD: - final GenericData.Record avroRecord = new GenericData.Record(schema); + case BYTES: + case FIXED: + if (rawValue instanceof byte[]) { + return ByteBuffer.wrap((byte[]) rawValue); + } + if (rawValue instanceof Object[]) { + return AvroTypeUtil.convertByteArray((Object[]) rawValue); + } else { + throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer"); + } + case MAP: + if (rawValue instanceof Record) { + final Record recordValue = (Record) rawValue; + final Map<String, Object> map = new HashMap<>(); + for (final String recordFieldName : recordValue.getSchema().getFieldNames()) { + final Object v = recordValue.getValue(recordFieldName); + if (v != null) { + map.put(recordFieldName, v); + } + } - final Record record = (Record) value; - for (final String recordFieldName : record.getSchema().getFieldNames()) { - final Object recordFieldValue = record.getValue(recordFieldName); + return map; + } else { + throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map"); + } + case RECORD: + final GenericData.Record avroRecord = new GenericData.Record(fieldSchema); - final Field field = schema.getField(recordFieldName); - if (field == null) { - continue; - } + final Record record = (Record) rawValue; + for (final String recordFieldName : record.getSchema().getFieldNames()) { + final Object recordFieldValue = record.getValue(recordFieldName); - final Object converted = convert(recordFieldValue, field.schema(), recordFieldName); - avroRecord.put(recordFieldName, converted); + final Field field = fieldSchema.getField(recordFieldName); + if (field == null) { + continue; } - return avroRecord; - } - return value.toString(); + final Object converted = convertToAvroObject(recordFieldValue, field.schema()); + avroRecord.put(recordFieldName, converted); + } + return avroRecord; + case ARRAY: + final Object[] objectArray = (Object[]) rawValue; + final List<Object> list = new ArrayList<>(objectArray.length); + for (final Object o : objectArray) { + final Object converted = convertToAvroObject(o, fieldSchema.getElementType()); + list.add(converted); + } + return list; + case BOOLEAN: + return DataTypeUtils.toBoolean(rawValue); + case DOUBLE: + return DataTypeUtils.toDouble(rawValue); + case FLOAT: + return DataTypeUtils.toFloat(rawValue); + case NULL: + return null; + case ENUM: + return new EnumSymbol(fieldSchema, rawValue); + case STRING: + return DataTypeUtils.toString(rawValue, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat()); + } + + return rawValue; + } - } else if (value instanceof List) { - return value; - } else if (value instanceof Object[]) { - final List<Object> list = new ArrayList<>(); - for (final Object o : ((Object[]) value)) { - final Object converted = convert(o, schema.getElementType(), fieldName); - list.add(converted); - } - return list; - } else if (value instanceof Number) { - return value; - } else if (value instanceof java.util.Date) { - final java.util.Date date = (java.util.Date) value; - return dateFormat.format(date); - } else if (value instanceof java.sql.Date) { - final java.sql.Date sqlDate = (java.sql.Date) value; - final java.util.Date date = new java.util.Date(sqlDate.getTime()); - return dateFormat.format(date); - } else if (value instanceof Time) { - final Time time = (Time) value; - final java.util.Date date = new java.util.Date(time.getTime()); - return timeFormat.format(date); - } else if (value instanceof Timestamp) { - final Timestamp time = (Timestamp) value; - final java.util.Date date = new java.util.Date(time.getTime()); - return timestampFormat.format(date); + @Override + public WriteResult write(final Record record, final OutputStream out) throws IOException { + final GenericRecord rec = createAvroRecord(record, schema); + + final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) { + dataFileWriter.create(schema, out); + dataFileWriter.append(rec); } - // The different types that we support are numbers (int, long, double, float), - // as well as boolean values and Strings. Since Avro doesn't provide - // timestamp types, we want to convert those to Strings. So we will cast anything other - // than numbers or booleans to strings by using the toString() method. - return value.toString(); + return WriteResult.of(1, Collections.emptyMap()); } @@ -275,7 +228,6 @@ public class WriteAvroResult implements RecordSetWriter { return "application/avro-binary"; } - public static String normalizeNameForAvro(String inputName) { String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_"); if (Character.isDigit(normalizedName.charAt(0))) { http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java index eccad7d..6b06ebf 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java @@ -19,31 +19,63 @@ package org.apache.nifi.csv; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; -import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.commons.csv.CSVFormat; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.DateTimeUtils; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RowRecordReaderFactory; -import org.apache.nifi.serialization.UserTypeOverrideRowReader; +import org.apache.nifi.serialization.SchemaRegistryRecordReader; +import org.apache.nifi.serialization.record.RecordSchema; @Tags({"csv", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"}) @CapabilityDescription("Parses CSV-formatted data, returning each row in the CSV file as a separate record. " + "This reader assumes that the first line in the content is the column names and all subsequent lines are " - + "the values. By default, the reader will assume that all columns are of 'String' type, but this can be " - + "overridden by adding a user-defined Property where the key is the name of a column and the value is the " - + "type of the column. For example, if a Property has the name \"balance\" with a value of float, it the " - + "reader will attempt to coerce all values in the \"balance\" column into a floating-point number. See " - + "Controller Service's Usage for further documentation.") -@DynamicProperty(name = "<name of column in CSV>", value = "<type of column values in CSV>", - description = "User-defined properties are used to indicate that the values of a specific column should be interpreted as a " - + "user-defined data type (e.g., int, double, float, date, etc.)", supportsExpressionLanguage = false) -public class CSVReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory { + + "the values. See Controller Service's Usage for further documentation.") +public class CSVReader extends SchemaRegistryRecordReader implements RowRecordReaderFactory { + + private volatile CSVFormat csvFormat; + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(DateTimeUtils.DATE_FORMAT); + properties.add(DateTimeUtils.TIME_FORMAT); + properties.add(DateTimeUtils.TIMESTAMP_FORMAT); + properties.add(CSVUtils.CSV_FORMAT); + properties.add(CSVUtils.VALUE_SEPARATOR); + properties.add(CSVUtils.QUOTE_CHAR); + properties.add(CSVUtils.ESCAPE_CHAR); + properties.add(CSVUtils.COMMENT_MARKER); + properties.add(CSVUtils.NULL_STRING); + properties.add(CSVUtils.TRIM_FIELDS); + return properties; + } + + @OnEnabled + public void storeCsvFormat(final ConfigurationContext context) { + this.csvFormat = CSVUtils.createCSVFormat(context); + this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue(); + } @Override - public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException { - return new CSVRecordReader(in, logger, getFieldTypeOverrides()); + public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException { + final RecordSchema schema = getSchema(flowFile); + return new CSVRecordReader(in, logger, schema, csvFormat, dateFormat, timeFormat, timestampFormat); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java index c2e8963..d02768c 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java @@ -17,200 +17,91 @@ package org.apache.nifi.csv; -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; +import java.io.Reader; import java.util.HashMap; -import java.util.List; import java.util.Map; +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.CSVParser; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.io.input.BOMInputStream; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.Record; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.util.DataTypeUtils; -import au.com.bytecode.opencsv.CSVReader; public class CSVRecordReader implements RecordReader { - private final ComponentLog logger; - private final CSVReader reader; - private final String[] firstLine; - private final Map<String, DataType> fieldTypeOverrides; - private RecordSchema schema; - - public CSVRecordReader(final InputStream in, final ComponentLog logger, final Map<String, DataType> fieldTypeOverrides) throws IOException { - this.logger = logger; - reader = new CSVReader(new InputStreamReader(new BufferedInputStream(in))); - firstLine = reader.readNext(); - this.fieldTypeOverrides = fieldTypeOverrides; + private final CSVParser csvParser; + private final RecordSchema schema; + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; + + public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, + final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException { + + final Reader reader = new InputStreamReader(new BOMInputStream(in)); + csvParser = new CSVParser(reader, csvFormat); + + this.schema = schema; + this.dateFormat = dateFormat; + this.timeFormat = timeFormat; + this.timestampFormat = timestampFormat; } @Override public Record nextRecord() throws IOException, MalformedRecordException { final RecordSchema schema = getSchema(); - while (true) { - final String[] line = reader.readNext(); - if (line == null) { - return null; - } - - final List<DataType> fieldTypes = schema.getDataTypes(); - if (fieldTypes.size() != line.length) { - logger.warn("Found record with incorrect number of fields. Expected {} but found {}; skipping record", new Object[] {fieldTypes.size(), line.length}); - continue; - } - - try { - final Map<String, Object> rowValues = new HashMap<>(schema.getFieldCount()); + for (final CSVRecord csvRecord : csvParser) { + final Map<String, Object> rowValues = new HashMap<>(schema.getFieldCount()); - int i = 0; - for (final String fieldName : schema.getFieldNames()) { - if (i >= line.length) { - rowValues.put(fieldName, null); - continue; - } - - final String rawValue = line[i++].trim(); - final Object converted = convert(schema.getDataType(fieldName).orElse(null), rawValue); - rowValues.put(fieldName, converted); + for (final String fieldName : schema.getFieldNames()) { + final String rawValue = csvRecord.get(fieldName); + if (rawValue == null) { + rowValues.put(fieldName, null); + continue; } - return new MapRecord(schema, rowValues); - } catch (final Exception e) { - throw new MalformedRecordException("Found invalid CSV record", e); + final Object converted = convert(rawValue, schema.getDataType(fieldName).orElse(null)); + rowValues.put(fieldName, converted); } + + return new MapRecord(schema, rowValues); } + + return null; } @Override public RecordSchema getSchema() { - if (schema != null) { - return schema; - } - - final List<RecordField> recordFields = new ArrayList<>(); - for (final String element : firstLine) { - - final String name = element.trim(); - final DataType dataType; - - final DataType overriddenDataType = fieldTypeOverrides.get(name); - if (overriddenDataType != null) { - dataType = overriddenDataType; - } else { - dataType = RecordFieldType.STRING.getDataType(); - } - - final RecordField field = new RecordField(name, dataType); - recordFields.add(field); - } - - if (recordFields.isEmpty()) { - recordFields.add(new RecordField("line", RecordFieldType.STRING.getDataType())); - } - - schema = new SimpleRecordSchema(recordFields); return schema; } - protected Object convert(final DataType dataType, final String value) { - if (dataType == null) { + protected Object convert(final String value, final DataType dataType) { + if (dataType == null || value == null) { return value; } - switch (dataType.getFieldType()) { - case BOOLEAN: - if (value.length() == 0) { - return null; - } - return Boolean.parseBoolean(value); - case BYTE: - if (value.length() == 0) { - return null; - } - return Byte.parseByte(value); - case SHORT: - if (value.length() == 0) { - return null; - } - return Short.parseShort(value); - case INT: - if (value.length() == 0) { - return null; - } - return Integer.parseInt(value); - case LONG: - case BIGINT: - if (value.length() == 0) { - return null; - } - return Long.parseLong(value); - case FLOAT: - if (value.length() == 0) { - return null; - } - return Float.parseFloat(value); - case DOUBLE: - if (value.length() == 0) { - return null; - } - return Double.parseDouble(value); - case DATE: - if (value.length() == 0) { - return null; - } - try { - final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value); - return new java.sql.Date(date.getTime()); - } catch (final ParseException e) { - logger.warn("Found invalid value for DATE field: " + value + " does not match expected format of " - + dataType.getFormat() + "; will substitute a NULL value for this field"); - return null; - } - case TIME: - if (value.length() == 0) { - return null; - } - try { - final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value); - return new java.sql.Time(date.getTime()); - } catch (final ParseException e) { - logger.warn("Found invalid value for TIME field: " + value + " does not match expected format of " - + dataType.getFormat() + "; will substitute a NULL value for this field"); - return null; - } - case TIMESTAMP: - if (value.length() == 0) { - return null; - } - try { - final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value); - return new java.sql.Timestamp(date.getTime()); - } catch (final ParseException e) { - logger.warn("Found invalid value for TIMESTAMP field: " + value + " does not match expected format of " - + dataType.getFormat() + "; will substitute a NULL value for this field"); - return null; - } - case STRING: - default: - return value; + final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value; + + if (trimmed.isEmpty()) { + return null; } + + return DataTypeUtils.convertType(trimmed, dataType, dateFormat, timeFormat, timestampFormat); } @Override public void close() throws IOException { - reader.close(); + csvParser.close(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java index 906e9c4..6a7b758 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java @@ -17,21 +17,51 @@ package org.apache.nifi.csv; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.csv.CSVFormat; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.serialization.AbstractRecordSetWriter; +import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; -@Tags({"csv", "result", "set", "writer", "serializer", "record", "row"}) -@CapabilityDescription("Writes the contents of a Database ResultSet as CSV data. The first line written " +@Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"}) +@CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written " + "will be the column names. All subsequent lines will be the values corresponding to those columns.") -public class CSVRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory { +public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory { + + private volatile CSVFormat csvFormat; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(CSVUtils.CSV_FORMAT); + properties.add(CSVUtils.VALUE_SEPARATOR); + properties.add(CSVUtils.QUOTE_CHAR); + properties.add(CSVUtils.ESCAPE_CHAR); + properties.add(CSVUtils.COMMENT_MARKER); + properties.add(CSVUtils.NULL_STRING); + properties.add(CSVUtils.TRIM_FIELDS); + properties.add(CSVUtils.QUOTE_MODE); + properties.add(CSVUtils.RECORD_SEPARATOR); + properties.add(CSVUtils.TRAILING_DELIMITER); + return properties; + } + + @OnEnabled + public void storeCsvFormat(final ConfigurationContext context) { + this.csvFormat = CSVUtils.createCSVFormat(context); + } @Override public RecordSetWriter createWriter(final ComponentLog logger) { - return new WriteCSVResult(getDateFormat(), getTimeFormat(), getTimestampFormat()); + return new WriteCSVResult(csvFormat, getDateFormat(), getTimeFormat(), getTimestampFormat()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java new file mode 100644 index 0000000..e23b6e1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import org.apache.commons.csv.CSVFormat; +import org.apache.commons.csv.QuoteMode; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; + +public class CSVUtils { + + static final AllowableValue CUSTOM = new AllowableValue("custom", "Custom Format", + "The format of the CSV is configured by using the properties of this Controller Service, such as Value Separator"); + static final AllowableValue RFC_4180 = new AllowableValue("rfc-4180", "RFC 4180", "CSV data follows the RFC 4180 Specification defined at https://tools.ietf.org/html/rfc4180"); + static final AllowableValue EXCEL = new AllowableValue("excel", "Microsoft Excel", "CSV data follows the format used by Microsoft Excel"); + static final AllowableValue TDF = new AllowableValue("tdf", "Tab-Delimited", "CSV data is Tab-Delimited instead of Comma Delimited"); + static final AllowableValue INFORMIX_UNLOAD = new AllowableValue("informix-unload", "Informix Unload", "The format used by Informix when issuing the UNLOAD TO file_name command"); + static final AllowableValue INFORMIX_UNLOAD_CSV = new AllowableValue("informix-unload", "Informix Unload Escape Disabled", + "The format used by Informix when issuing the UNLOAD TO file_name command with escaping disabled"); + static final AllowableValue MYSQL = new AllowableValue("mysql", "MySQL Format", "CSV data follows the format used by MySQL"); + + static final PropertyDescriptor CSV_FORMAT = new PropertyDescriptor.Builder() + .name("CSV Format") + .description("Specifies which \"format\" the CSV data is in, or specifies if custom formatting should be used.") + .expressionLanguageSupported(false) + .allowableValues(CUSTOM, RFC_4180, EXCEL, TDF, MYSQL, INFORMIX_UNLOAD, INFORMIX_UNLOAD_CSV) + .defaultValue(CUSTOM.getValue()) + .required(true) + .build(); + static final PropertyDescriptor VALUE_SEPARATOR = new PropertyDescriptor.Builder() + .name("Value Separator") + .description("The character that is used to separate values/fields in a CSV Record") + .addValidator(new SingleCharacterValidator()) + .expressionLanguageSupported(false) + .defaultValue(",") + .required(true) + .build(); + static final PropertyDescriptor QUOTE_CHAR = new PropertyDescriptor.Builder() + .name("Quote Character") + .description("The character that is used to quote values so that escape characters do not have to be used") + .addValidator(new SingleCharacterValidator()) + .expressionLanguageSupported(false) + .defaultValue("\"") + .required(true) + .build(); + static final PropertyDescriptor COMMENT_MARKER = new PropertyDescriptor.Builder() + .name("Comment Marker") + .description("The character that is used to denote the start of a comment. Any line that begins with this comment will be ignored.") + .addValidator(new SingleCharacterValidator()) + .expressionLanguageSupported(false) + .required(false) + .build(); + static final PropertyDescriptor ESCAPE_CHAR = new PropertyDescriptor.Builder() + .name("Escape Character") + .description("The character that is used to escape characters that would otherwise have a specific meaning to the CSV Parser.") + .addValidator(new SingleCharacterValidator()) + .expressionLanguageSupported(false) + .defaultValue("\\") + .required(true) + .build(); + static final PropertyDescriptor NULL_STRING = new PropertyDescriptor.Builder() + .name("Null String") + .description("Specifies a String that, if present as a value in the CSV, should be considered a null field instead of using the literal value.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .required(false) + .build(); + static final PropertyDescriptor TRIM_FIELDS = new PropertyDescriptor.Builder() + .name("Trim Fields") + .description("Whether or not white space should be removed from the beginning and end of fields") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("true") + .required(true) + .build(); + + // CSV Format fields for writers only + static final AllowableValue QUOTE_ALL = new AllowableValue("ALL", "Quote All Values", "All values will be quoted using the configured quote character."); + static final AllowableValue QUOTE_MINIMAL = new AllowableValue("MINIMAL", "Quote Minimal", + "Values will be quoted only if they are contain special characters such as newline characters or field separators."); + static final AllowableValue QUOTE_NON_NUMERIC = new AllowableValue("NON_NUMERIC", "Quote Non-Numeric Values", "Values will be quoted unless the value is a number."); + static final AllowableValue QUOTE_NONE = new AllowableValue("NONE", "Do Not Quote Values", + "Values will not be quoted. Instead, all special characters will be escaped using the configured escape character."); + + static final PropertyDescriptor QUOTE_MODE = new PropertyDescriptor.Builder() + .name("Quote Mode") + .description("Specifies how fields should be quoted when they are written") + .expressionLanguageSupported(false) + .allowableValues(QUOTE_ALL, QUOTE_MINIMAL, QUOTE_NON_NUMERIC, QUOTE_NONE) + .defaultValue(QUOTE_MINIMAL.getValue()) + .required(true) + .build(); + static final PropertyDescriptor TRAILING_DELIMITER = new PropertyDescriptor.Builder() + .name("Include Trailing Delimiter") + .description("If true, a trailing delimiter will be added to each CSV Record that is written. If false, the trailing delimiter will be omitted.") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder() + .name("Record Separator") + .description("Specifies the characters to use in order to separate CSV Records") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(false) + .defaultValue("\\n") + .required(true) + .build(); + + + static CSVFormat createCSVFormat(final ConfigurationContext context) { + final String formatName = context.getProperty(CSV_FORMAT).getValue(); + if (formatName.equalsIgnoreCase(CUSTOM.getValue())) { + return buildCustomFormat(context); + } + if (formatName.equalsIgnoreCase(RFC_4180.getValue())) { + return CSVFormat.RFC4180; + } else if (formatName.equalsIgnoreCase(EXCEL.getValue())) { + return CSVFormat.EXCEL; + } else if (formatName.equalsIgnoreCase(TDF.getValue())) { + return CSVFormat.TDF; + } else if (formatName.equalsIgnoreCase(MYSQL.getValue())) { + return CSVFormat.MYSQL; + } else if (formatName.equalsIgnoreCase(INFORMIX_UNLOAD.getValue())) { + return CSVFormat.INFORMIX_UNLOAD; + } else if (formatName.equalsIgnoreCase(INFORMIX_UNLOAD_CSV.getValue())) { + return CSVFormat.INFORMIX_UNLOAD_CSV; + } else { + return CSVFormat.DEFAULT; + } + } + + private static char getChar(final ConfigurationContext context, final PropertyDescriptor property) { + return CSVUtils.unescape(context.getProperty(property).getValue()).charAt(0); + } + + private static CSVFormat buildCustomFormat(final ConfigurationContext context) { + final char valueSeparator = getChar(context, VALUE_SEPARATOR); + CSVFormat format = CSVFormat.newFormat(valueSeparator) + .withAllowMissingColumnNames() + .withIgnoreEmptyLines() + .withFirstRecordAsHeader(); + + format = format.withQuote(getChar(context, QUOTE_CHAR)); + format = format.withEscape(getChar(context, ESCAPE_CHAR)); + format = format.withTrim(context.getProperty(TRIM_FIELDS).asBoolean()); + + if (context.getProperty(COMMENT_MARKER).isSet()) { + format = format.withCommentMarker(getChar(context, COMMENT_MARKER)); + } + if (context.getProperty(NULL_STRING).isSet()) { + format = format.withNullString(CSVUtils.unescape(context.getProperty(NULL_STRING).getValue())); + } + + final PropertyValue quoteValue = context.getProperty(QUOTE_MODE); + if (quoteValue != null) { + final QuoteMode quoteMode = QuoteMode.valueOf(quoteValue.getValue()); + format = format.withQuoteMode(quoteMode); + } + + final PropertyValue trailingDelimiterValue = context.getProperty(TRAILING_DELIMITER); + if (trailingDelimiterValue != null) { + final boolean trailingDelimiter = trailingDelimiterValue.asBoolean(); + format = format.withTrailingDelimiter(trailingDelimiter); + } + + final PropertyValue recordSeparator = context.getProperty(RECORD_SEPARATOR); + if (recordSeparator != null) { + final String separator = unescape(recordSeparator.getValue()); + format = format.withRecordSeparator(separator); + } + + return format; + } + + + public static String unescape(final String input) { + if (input == null) { + return input; + } + + return input.replace("\\t", "\t") + .replace("\\n", "\n") + .replace("\\r", "\r"); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/68c592ea/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/SingleCharacterValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/SingleCharacterValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/SingleCharacterValidator.java new file mode 100644 index 0000000..b24dea9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/SingleCharacterValidator.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +public class SingleCharacterValidator implements Validator { + private static final Set<String> illegalChars = new HashSet<>(); + static { + illegalChars.add("\r"); + illegalChars.add("\n"); + } + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + final String unescaped = CSVUtils.unescape(input); + if (unescaped.length() != 1) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Value must be exactly 1 character but was " + input.length() + " in length") + .build(); + } + + if (illegalChars.contains(input)) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation(input + " is not a valid character for this property") + .build(); + } + + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .build(); + } + +}