http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java new file mode 100644 index 0000000..286326a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.io.IOException; +import java.io.InputStream; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.TimeZone; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.node.ArrayNode; + + +public abstract class AbstractJsonRowRecordReader implements RecordReader { + private final ComponentLog logger; + private final JsonParser jsonParser; + private final JsonFactory jsonFactory; + private final boolean array; + private final JsonNode firstJsonNode; + + private boolean firstObjectConsumed = false; + + private static final TimeZone gmt = TimeZone.getTimeZone("GMT"); + + + public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException { + this.logger = logger; + + jsonFactory = new JsonFactory(); + try { + jsonParser = jsonFactory.createJsonParser(in); + jsonParser.setCodec(new ObjectMapper()); + + JsonToken token = jsonParser.nextToken(); + if (token == JsonToken.START_ARRAY) { + array = true; + token = jsonParser.nextToken(); // advance to START_OBJECT token + } else { + array = false; + } + + if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also + firstJsonNode = jsonParser.readValueAsTree(); + } else { + firstJsonNode = null; + } + } catch (final JsonParseException e) { + throw new MalformedRecordException("Could not parse data as JSON", e); + } + } + + @Override + public Record nextRecord() throws IOException, MalformedRecordException { + if (firstObjectConsumed && !array) { + return null; + } + + final JsonNode nextNode = getNextJsonNode(); + final RecordSchema schema = getSchema(); + try { + return convertJsonNodeToRecord(nextNode, schema); + } catch (final MalformedRecordException mre) { + throw mre; + } catch (final IOException ioe) { + throw ioe; + } catch (final Exception e) { + logger.debug("Failed to convert JSON Element {} into a Record object using schema {} due to {}", new Object[] {nextNode, schema, e.toString(), e}); + throw new MalformedRecordException("Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema", e); + } + } + + protected DataType determineFieldType(final JsonNode node) { + if (node.isDouble()) { + return RecordFieldType.DOUBLE.getDataType(); + } + if (node.isBoolean()) { + return RecordFieldType.BOOLEAN.getDataType(); + } + if (node.isFloatingPointNumber()) { + return RecordFieldType.FLOAT.getDataType(); + } + if (node.isBigInteger()) { + return RecordFieldType.BIGINT.getDataType(); + } + if (node.isBigDecimal()) { + return RecordFieldType.DOUBLE.getDataType(); + } + if (node.isLong()) { + return RecordFieldType.LONG.getDataType(); + } + if (node.isInt()) { + return RecordFieldType.INT.getDataType(); + } + if (node.isTextual()) { + return RecordFieldType.STRING.getDataType(); + } + if (node.isArray()) { + return RecordFieldType.ARRAY.getDataType(); + } + + final RecordSchema childSchema = determineSchema(node); + return RecordFieldType.RECORD.getDataType(childSchema); + } + + protected RecordSchema determineSchema(final JsonNode jsonNode) { + final List<RecordField> recordFields = new ArrayList<>(); + + final Iterator<Map.Entry<String, JsonNode>> itr = jsonNode.getFields(); + while (itr.hasNext()) { + final Map.Entry<String, JsonNode> entry = itr.next(); + final String elementName = entry.getKey(); + final JsonNode node = entry.getValue(); + + DataType dataType = determineFieldType(node); + recordFields.add(new RecordField(elementName, dataType)); + } + + return new SimpleRecordSchema(recordFields); + } + + protected Object convertField(final JsonNode fieldNode, final String fieldName, final DataType desiredType) throws IOException, MalformedRecordException { + if (fieldNode == null || fieldNode.isNull()) { + return null; + } + + switch (desiredType.getFieldType()) { + case BOOLEAN: + return fieldNode.asBoolean(); + case BYTE: + return (byte) fieldNode.asInt(); + case CHAR: + final String text = fieldNode.asText(); + if (text.isEmpty()) { + return null; + } + return text.charAt(0); + case DOUBLE: + return fieldNode.asDouble(); + case FLOAT: + return (float) fieldNode.asDouble(); + case INT: + return fieldNode.asInt(); + case LONG: + return fieldNode.asLong(); + case SHORT: + return (short) fieldNode.asInt(); + case STRING: + return fieldNode.asText(); + case DATE: { + final String string = fieldNode.asText(); + if (string.isEmpty()) { + return null; + } + + try { + final DateFormat dateFormat = new SimpleDateFormat(desiredType.getFormat()); + dateFormat.setTimeZone(gmt); + final Date date = dateFormat.parse(string); + return new java.sql.Date(date.getTime()); + } catch (ParseException e) { + logger.warn("Failed to convert JSON field to Date for field {} (value {})", new Object[] {fieldName, string, e}); + return null; + } + } + case TIME: { + final String string = fieldNode.asText(); + if (string.isEmpty()) { + return null; + } + + try { + final DateFormat dateFormat = new SimpleDateFormat(desiredType.getFormat()); + dateFormat.setTimeZone(gmt); + final Date date = dateFormat.parse(string); + return new java.sql.Date(date.getTime()); + } catch (ParseException e) { + logger.warn("Failed to convert JSON field to Time for field {} (value {})", new Object[] {fieldName, string, e}); + return null; + } + } + case TIMESTAMP: { + final String string = fieldNode.asText(); + if (string.isEmpty()) { + return null; + } + + try { + final DateFormat dateFormat = new SimpleDateFormat(desiredType.getFormat()); + dateFormat.setTimeZone(gmt); + final Date date = dateFormat.parse(string); + return new java.sql.Date(date.getTime()); + } catch (ParseException e) { + logger.warn("Failed to convert JSON field to Timestamp for field {} (value {})", new Object[] {fieldName, string, e}); + return null; + } + } + case ARRAY: { + final ArrayNode arrayNode = (ArrayNode) fieldNode; + final int numElements = arrayNode.size(); + final Object[] arrayElements = new Object[numElements]; + int count = 0; + for (final JsonNode node : arrayNode) { + final Object converted = convertField(node, fieldName, determineFieldType(node)); + arrayElements[count++] = converted; + } + + return arrayElements; + } + case RECORD: { + if (fieldNode.isObject()) { + final Optional<RecordSchema> childSchema = desiredType.getChildRecordSchema(); + if (!childSchema.isPresent()) { + return null; + } + + return convertJsonNodeToRecord(fieldNode, childSchema.get()); + } else { + return fieldNode.toString(); + } + } + } + + return fieldNode.toString(); + } + + private JsonNode getNextJsonNode() throws JsonParseException, IOException, MalformedRecordException { + if (!firstObjectConsumed) { + firstObjectConsumed = true; + return firstJsonNode; + } + + while (true) { + final JsonToken token = jsonParser.nextToken(); + if (token == null) { + return null; + } + + switch (token) { + case END_OBJECT: + continue; + case START_OBJECT: + return jsonParser.readValueAsTree(); + case END_ARRAY: + case START_ARRAY: + return null; + default: + throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name()); + } + } + } + + @Override + public void close() throws IOException { + jsonParser.close(); + } + + protected JsonParser getJsonParser() { + return jsonParser; + } + + protected JsonFactory getJsonFactory() { + return jsonFactory; + } + + protected Optional<JsonNode> getFirstJsonNode() { + return Optional.ofNullable(firstJsonNode); + } + + protected abstract Record convertJsonNodeToRecord(final JsonNode nextNode, final RecordSchema schema) throws IOException, MalformedRecordException; +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java new file mode 100644 index 0000000..b43b1c1 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.record.DataType; + +import com.jayway.jsonpath.JsonPath; + +@Tags({"json", "jsonpath", "record", "reader", "parser"}) +@CapabilityDescription("Parses JSON records and evaluates user-defined JSON Path's against each JSON object. The root element may be either " + + "a single JSON object or a JSON array. If a JSON array is found, each JSON object within that array is treated as a separate record. " + + "User-defined properties define the fields that should be extracted from the JSON in order to form the fields of a Record. Any JSON field " + + "that is not extracted via a JSONPath will not be returned in the JSON Records.") +@SeeAlso(JsonTreeReader.class) +@DynamicProperty(name = "The field name for the record. If it is desirable to enforce that the value be coerced into a given type, its type can be included " + + "in the name by using a syntax of <field name>:<field type>. For example, \"balance:double\".", + value="A JSONPath Expression that will be evaluated against each JSON record. The result of the JSONPath will be the value of the " + + "field whose name is the same as the property name.", + description="User-defined properties identifiy how to extract specific fields from a JSON object in order to create a Record", + supportsExpressionLanguage=false) +public class JsonPathReader extends AbstractControllerService implements RowRecordReaderFactory { + + private volatile LinkedHashMap<String, JsonPath> jsonPaths; + private volatile Map<String, DataType> fieldTypeOverrides; + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("JsonPath Expression that indicates how to retrieve the value from a JSON Object for the '" + propertyDescriptorName + "' column") + .dynamic(true) + .required(false) + .addValidator(new JsonPathValidator()) + .build(); + } + + @OnEnabled + public void compileJsonPaths(final ConfigurationContext context) { + final Map<String, DataType> fieldTypes = new HashMap<>(context.getProperties().size()); + + final LinkedHashMap<String, JsonPath> compiled = new LinkedHashMap<>(); + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (!descriptor.isDynamic()) { + continue; + } + + final String fieldName = PropertyNameUtil.getFieldName(descriptor.getName()); + final Optional<DataType> dataTypeOption = PropertyNameUtil.getDataType(descriptor.getName()); + if (dataTypeOption.isPresent()) { + fieldTypes.put(fieldName, dataTypeOption.get()); + } + + final String expression = context.getProperty(descriptor).getValue(); + final JsonPath jsonPath = JsonPath.compile(expression); + compiled.put(fieldName, jsonPath); + } + + jsonPaths = compiled; + fieldTypeOverrides = fieldTypes; + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + boolean pathSpecified = false; + for (final PropertyDescriptor property : validationContext.getProperties().keySet()) { + if (property.isDynamic()) { + pathSpecified = true; + break; + } + } + + if (pathSpecified) { + return Collections.emptyList(); + } + + return Collections.singleton(new ValidationResult.Builder() + .subject("JSON Paths") + .valid(false) + .explanation("No JSON Paths were specified") + .build()); + } + + @Override + public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException { + return new JsonPathRowRecordReader(jsonPaths, fieldTypeOverrides, in, logger); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java new file mode 100644 index 0000000..9654b97 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.io.IOException; +import java.io.InputStream; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.stream.Collectors; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.DataTypeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.codehaus.jackson.JsonNode; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; +import com.jayway.jsonpath.spi.json.JacksonJsonProvider; + +public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { + private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build(); + + private static final String TIME_FORMAT_DATE = "yyyy-MM-dd"; + private static final String TIME_FORMAT_TIME = "HH:mm:ss"; + private static final String TIME_FORMAT_TIMESTAMP = "yyyy-MM-dd HH:mm:ss"; + private static final TimeZone gmt = TimeZone.getTimeZone("GMT"); + + private final LinkedHashMap<String, JsonPath> jsonPaths; + private final Map<String, DataType> fieldTypeOverrides; + private final InputStream in; + private RecordSchema schema; + + public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final Map<String, DataType> fieldTypeOverrides, final InputStream in, final ComponentLog logger) + throws MalformedRecordException, IOException { + super(in, logger); + + this.jsonPaths = jsonPaths; + this.fieldTypeOverrides = fieldTypeOverrides; + this.in = in; + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public RecordSchema getSchema() { + if (schema != null) { + return schema; + } + + final Optional<JsonNode> firstNodeOption = getFirstJsonNode(); + + final List<RecordField> recordFields = new ArrayList<>(); + if (firstNodeOption.isPresent()) { + final DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(firstNodeOption.get().toString()); + for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) { + final String fieldName = PropertyNameUtil.getFieldName(entry.getKey()); + final JsonPath jsonPath = entry.getValue(); + + final DataType dataType; + final DataType dataTypeOverride = fieldTypeOverrides.get(fieldName); + if (dataTypeOverride == null) { + Object value; + try { + value = ctx.read(jsonPath); + } catch (final PathNotFoundException pnfe) { + value = null; + } + + if (value == null) { + dataType = RecordFieldType.STRING.getDataType(); + } else { + dataType = DataTypeUtils.inferDataType(value); + } + } else { + dataType = dataTypeOverride; + } + + recordFields.add(new RecordField(fieldName, dataType)); + } + } + + // If there are any overridden field types that we didn't find, add as the last fields. + final Set<String> knownFieldNames = recordFields.stream() + .map(f -> f.getFieldName()) + .collect(Collectors.toSet()); + + for (final Map.Entry<String, DataType> entry : fieldTypeOverrides.entrySet()) { + if (!knownFieldNames.contains(entry.getKey())) { + recordFields.add(new RecordField(entry.getKey(), entry.getValue())); + } + } + + schema = new SimpleRecordSchema(recordFields); + return schema; + } + + + @Override + @SuppressWarnings("unchecked") + protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException { + if (jsonNode == null) { + return null; + } + + final DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString()); + final Map<String, Object> values = new HashMap<>(schema.getFieldCount()); + + for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) { + final JsonPath jsonPath = entry.getValue(); + + Object value; + try { + value = ctx.read(jsonPath); + } catch (final PathNotFoundException pnfe) { + value = null; + } + + final String fieldName = entry.getKey(); + if (value != null) { + final DataType determinedType = DataTypeUtils.inferDataType(value); + final DataType desiredType = schema.getDataType(fieldName).orElse(null); + + if (value instanceof List) { + value = ((List<Object>) value).toArray(); + } else if (value instanceof Map && desiredType.getFieldType() == RecordFieldType.RECORD) { + value = convert(desiredType, value); + } else if (desiredType != null && !determinedType.equals(desiredType) && shouldConvert(value, determinedType.getFieldType())) { + value = convert(desiredType, value); + } + } + + values.put(fieldName, value); + } + + return new MapRecord(schema, values); + } + + private boolean shouldConvert(final Object value, final RecordFieldType determinedType) { + return determinedType != null + && determinedType != RecordFieldType.ARRAY; + } + + + protected Object convert(final DataType dataType, final Object value) { + if (dataType.getFieldType() == RecordFieldType.RECORD && dataType.getChildRecordSchema().isPresent() && value instanceof Map) { + @SuppressWarnings("unchecked") + final Map<String, Object> map = (Map<String, Object>) value; + return new MapRecord(dataType.getChildRecordSchema().get(), map); + } else { + return convertString(dataType, value.toString()); + } + } + + /** + * Coerces the given string into the provided data type, if possible + * + * @param dataType the desired type + * @param string the string representation of the value + * @return an Object representing the same value as the given string but in the requested data type + */ + protected Object convertString(final DataType dataType, final String string) { + if (dataType == null) { + return string; + } + + switch (dataType.getFieldType()) { + case BOOLEAN: + if (string.length() == 0) { + return null; + } + return Boolean.parseBoolean(string); + case BYTE: + if (string.length() == 0) { + return null; + } + return Byte.parseByte(string); + case SHORT: + if (string.length() == 0) { + return null; + } + return Short.parseShort(string); + case INT: + if (string.length() == 0) { + return null; + } + return Integer.parseInt(string); + case LONG: + if (string.length() == 0) { + return null; + } + return Long.parseLong(string); + case FLOAT: + if (string.length() == 0) { + return null; + } + return Float.parseFloat(string); + case DOUBLE: + if (string.length() == 0) { + return null; + } + return Double.parseDouble(string); + case DATE: + if (string.length() == 0) { + return null; + } + try { + final DateFormat format = new SimpleDateFormat(TIME_FORMAT_DATE); + format.setTimeZone(gmt); + Date date = format.parse(string); + return new java.sql.Date(date.getTime()); + } catch (ParseException e) { + return null; + } + case TIME: + if (string.length() == 0) { + return null; + } + try { + final DateFormat format = new SimpleDateFormat(TIME_FORMAT_TIME); + format.setTimeZone(gmt); + Date date = format.parse(string); + return new java.sql.Time(date.getTime()); + } catch (ParseException e) { + return null; + } + case TIMESTAMP: + if (string.length() == 0) { + return null; + } + try { + final DateFormat format = new SimpleDateFormat(TIME_FORMAT_TIMESTAMP); + format.setTimeZone(gmt); + Date date = format.parse(string); + return new java.sql.Timestamp(date.getTime()); + } catch (ParseException e) { + return null; + } + case STRING: + default: + return string; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java new file mode 100644 index 0000000..626f56c --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathValidator.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +import com.jayway.jsonpath.JsonPath; + +public class JsonPathValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (PropertyNameUtil.hasFieldType(subject) && !PropertyNameUtil.isFieldTypeValid(subject)) { + final String fieldType = PropertyNameUtil.getFieldTypeName(subject).get(); + + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Invalid field type. If property name contains a colon (:) it must use syntax of " + + "<field name>:<field type> but the specified field type ('" + fieldType + "') is not a valid field type") + .build(); + } + + try { + JsonPath.compile(input); + } catch (final Exception e) { + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(false) + .explanation("Invalid JSON Path Expression: " + e.getMessage()) + .build(); + } + + return new ValidationResult.Builder() + .subject(subject) + .input(input) + .valid(true) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java new file mode 100644 index 0000000..dc75a51 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.AbstractRecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +@Tags({"json", "resultset", "writer", "serialize", "record", "row"}) +@CapabilityDescription("Writes the results of a Database ResultSet as a JSON Array. Even if the ResultSet " + + "consists of a single row, it will be written as an array with a single element.") +public class JsonRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory { + + static final PropertyDescriptor PRETTY_PRINT_JSON = new PropertyDescriptor.Builder() + .name("Pretty Print JSON") + .description("Specifies whether or not the JSON should be pretty printed") + .expressionLanguageSupported(false) + .allowableValues("true", "false") + .defaultValue("false") + .required(true) + .build(); + + private boolean prettyPrint; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(PRETTY_PRINT_JSON); + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + prettyPrint = context.getProperty(PRETTY_PRINT_JSON).asBoolean(); + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger) { + return new WriteJsonResult(logger, prettyPrint, getDateFormat(), getTimeFormat(), getTimestampFormat()); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java new file mode 100644 index 0000000..2d7072a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.UserTypeOverrideRowReader; + +@Tags({"json", "tree", "record", "reader", "parser"}) +@CapabilityDescription("Parses JSON into individual Record objects. The Record that is produced will contain all top-level " + + "elements of the corresponding JSON Object. If the JSON has nested arrays, those values will be represented as an Object array for that field. " + + "Nested JSON objects will be represented as a Map. " + + "The root JSON element can be either a single element or an array of JSON elements, and each " + + "element in that array will be treated as a separate record. If any of the elements has a nested array or a nested " + + "element, they will be returned as OBJECT or ARRAY types (respectively), not flattened out into individual fields. " + + "The schema for the record is determined by the first JSON element in the array, if the incoming FlowFile is a JSON array. " + + "This means that if a field does not exist in the first JSON object, then it will be skipped in all subsequent JSON objects. " + + "The data type of a field can be overridden by adding a property to " + + "the controller service where the name of the property matches the JSON field name and the value of the property is " + + "the data type to use. If that field does not exist in a JSON element, the field will be assumed to be null. " + + "See the Usage of the Controller Service for more information.") +@SeeAlso(JsonPathReader.class) +@DynamicProperty(name = "<name of JSON field>", value = "<data type of JSON field>", + description = "User-defined properties are used to indicate that the values of a specific field should be interpreted as a " + + "user-defined data type (e.g., int, double, float, date, etc.)", supportsExpressionLanguage = false) +public class JsonTreeReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory { + + @Override + public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException, MalformedRecordException { + return new JsonTreeRowRecordReader(in, logger, getFieldTypeOverrides()); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java new file mode 100644 index 0000000..4a2d212 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.codehaus.jackson.JsonNode; + + +public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader { + private final Map<String, DataType> fieldTypeOverrides; + private RecordSchema schema; + + public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final Map<String, DataType> fieldTypeOverrides) throws IOException, MalformedRecordException { + super(in, logger); + this.fieldTypeOverrides = fieldTypeOverrides; + } + + @Override + protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException, MalformedRecordException { + if (jsonNode == null) { + return null; + } + + final Map<String, Object> values = new HashMap<>(schema.getFieldCount()); + for (int i = 0; i < schema.getFieldCount(); i++) { + final RecordField field = schema.getField(i); + final String fieldName = field.getFieldName(); + final JsonNode fieldNode = jsonNode.get(fieldName); + + final DataType desiredType = field.getDataType(); + final Object value = convertField(fieldNode, fieldName, desiredType); + values.put(fieldName, value); + } + + return new MapRecord(schema, values); + } + + + @Override + public RecordSchema getSchema() { + if (schema != null) { + return schema; + } + + final List<RecordField> recordFields = new ArrayList<>(); + final Optional<JsonNode> firstNodeOption = getFirstJsonNode(); + + if (firstNodeOption.isPresent()) { + final Iterator<Map.Entry<String, JsonNode>> itr = firstNodeOption.get().getFields(); + while (itr.hasNext()) { + final Map.Entry<String, JsonNode> entry = itr.next(); + final String elementName = entry.getKey(); + final JsonNode node = entry.getValue(); + + DataType dataType; + final DataType overriddenDataType = fieldTypeOverrides.get(elementName); + if (overriddenDataType == null) { + dataType = determineFieldType(node); + } else { + dataType = overriddenDataType; + } + + recordFields.add(new RecordField(elementName, dataType)); + } + } + + // If there are any overridden field types that we didn't find, add as the last fields. + final Set<String> knownFieldNames = recordFields.stream() + .map(f -> f.getFieldName()) + .collect(Collectors.toSet()); + + for (final Map.Entry<String, DataType> entry : fieldTypeOverrides.entrySet()) { + if (!knownFieldNames.contains(entry.getKey())) { + recordFields.add(new RecordField(entry.getKey(), entry.getValue())); + } + } + + schema = new SimpleRecordSchema(recordFields); + return schema; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java new file mode 100644 index 0000000..3b7dcf9 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/PropertyNameUtil.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.util.Optional; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; + +public class PropertyNameUtil { + + public static String getFieldName(final String propertyName) { + final int colonIndex = propertyName.indexOf(":"); + if (colonIndex > -1 && colonIndex < propertyName.length() - 1) { + return propertyName.substring(0, colonIndex); + } + + return propertyName; + } + + public static boolean hasFieldType(final String propertyName) { + final int colonIndex = propertyName.indexOf(":"); + return (colonIndex > -1 && colonIndex < propertyName.length() - 1); + } + + public static Optional<String> getFieldTypeName(final String propertyName) { + if (hasFieldType(propertyName)) { + final String[] splits = propertyName.split("\\:"); + if (splits.length > 1) { + return Optional.of(splits[1]); + } + return Optional.empty(); + } + + return Optional.empty(); + } + + public static Optional<String> getFieldFormat(final String propertyName) { + final String[] splits = propertyName.split("\\:"); + if (splits.length != 3) { + return Optional.empty(); + } + + return Optional.of(splits[2]); + } + + public static boolean isFieldTypeValid(final String propertyName) { + final Optional<String> fieldType = getFieldTypeName(propertyName); + if (!fieldType.isPresent()) { + return false; + } + + final String typeName = fieldType.get(); + final RecordFieldType recordFieldType = RecordFieldType.of(typeName); + return recordFieldType != null; + } + + public static Optional<DataType> getDataType(final String propertyName) { + if (isFieldTypeValid(propertyName)) { + final String typeName = getFieldTypeName(propertyName).get(); + final RecordFieldType fieldType = RecordFieldType.of(typeName); + + final Optional<String> format = getFieldFormat(propertyName); + if (format.isPresent()) { + return Optional.of(fieldType.getDataType(format.get())); + } else { + return Optional.of(fieldType.getDataType()); + } + } + + return Optional.empty(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java new file mode 100644 index 0000000..cf72b19 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.io.IOException; +import java.io.OutputStream; +import java.math.BigInteger; +import java.sql.Array; +import java.sql.SQLException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.DataTypeUtils; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.NonCloseableOutputStream; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonGenerator; + +public class WriteJsonResult implements RecordSetWriter { + private final boolean prettyPrint; + + private final ComponentLog logger; + private final JsonFactory factory = new JsonFactory(); + private final DateFormat dateFormat; + private final DateFormat timeFormat; + private final DateFormat timestampFormat; + + public WriteJsonResult(final ComponentLog logger, final boolean prettyPrint, final String dateFormat, final String timeFormat, final String timestampFormat) { + this.prettyPrint = prettyPrint; + this.dateFormat = new SimpleDateFormat(dateFormat); + this.timeFormat = new SimpleDateFormat(timeFormat); + this.timestampFormat = new SimpleDateFormat(timestampFormat); + this.logger = logger; + } + + @Override + public WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException { + int count = 0; + + try (final JsonGenerator generator = factory.createJsonGenerator(new NonCloseableOutputStream(rawOut))) { + if (prettyPrint) { + generator.useDefaultPrettyPrinter(); + } + + generator.writeStartArray(); + + Record record; + while ((record = rs.next()) != null) { + count++; + writeRecord(record, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); + } + + generator.writeEndArray(); + } catch (final SQLException e) { + throw new IOException("Failed to serialize Result Set to stream", e); + } + + return WriteResult.of(count, Collections.emptyMap()); + } + + @Override + public WriteResult write(final Record record, final OutputStream rawOut) throws IOException { + try (final JsonGenerator generator = factory.createJsonGenerator(new NonCloseableOutputStream(rawOut))) { + if (prettyPrint) { + generator.useDefaultPrettyPrinter(); + } + + writeRecord(record, generator, g -> g.writeStartObject(), g -> g.writeEndObject()); + } catch (final SQLException e) { + throw new IOException("Failed to write records to stream", e); + } + + return WriteResult.of(1, Collections.emptyMap()); + } + + private void writeRecord(final Record record, final JsonGenerator generator, final GeneratorTask startTask, final GeneratorTask endTask) + throws JsonGenerationException, IOException, SQLException { + + try { + final RecordSchema schema = record.getSchema(); + startTask.apply(generator); + for (int i = 0; i < schema.getFieldCount(); i++) { + final String fieldName = schema.getField(i).getFieldName(); + final Object value = record.getValue(fieldName); + if (value == null) { + generator.writeNullField(fieldName); + continue; + } + + generator.writeFieldName(fieldName); + final DataType dataType = schema.getDataType(fieldName).get(); + + writeValue(generator, value, dataType, i < schema.getFieldCount() - 1); + } + + endTask.apply(generator); + } catch (final Exception e) { + logger.error("Failed to write {} with schema {} as a JSON Object due to {}", new Object[] {record, record.getSchema(), e.toString(), e}); + throw e; + } + } + + private String createDate(final Object value, final DateFormat format) { + if (value == null) { + return null; + } + + if (value instanceof Date) { + return format.format((Date) value); + } + if (value instanceof java.sql.Date) { + return format.format(new Date(((java.sql.Date) value).getTime())); + } + if (value instanceof java.sql.Time) { + return format.format(new Date(((java.sql.Time) value).getTime())); + } + if (value instanceof java.sql.Timestamp) { + return format.format(new Date(((java.sql.Timestamp) value).getTime())); + } + + return null; + } + + private void writeValue(final JsonGenerator generator, final Object value, final DataType dataType, final boolean moreCols) + throws JsonGenerationException, IOException, SQLException { + if (value == null) { + generator.writeNull(); + return; + } + + final DataType resolvedDataType; + if (dataType.getFieldType() == RecordFieldType.CHOICE) { + resolvedDataType = DataTypeUtils.inferDataType(value); + } else { + resolvedDataType = dataType; + } + + switch (resolvedDataType.getFieldType()) { + case DATE: + generator.writeString(createDate(value, dateFormat)); + break; + case TIME: + generator.writeString(createDate(value, timeFormat)); + break; + case TIMESTAMP: + generator.writeString(createDate(value, timestampFormat)); + break; + case DOUBLE: + generator.writeNumber(DataTypeUtils.toDouble(value, 0D)); + break; + case FLOAT: + generator.writeNumber(DataTypeUtils.toFloat(value, 0F)); + break; + case LONG: + generator.writeNumber(DataTypeUtils.toLong(value, 0L)); + break; + case INT: + case BYTE: + case SHORT: + generator.writeNumber(DataTypeUtils.toInteger(value, 0)); + break; + case CHAR: + case STRING: + generator.writeString(value.toString()); + break; + case BIGINT: + if (value instanceof Long) { + generator.writeNumber(((Long) value).longValue()); + } else { + generator.writeNumber((BigInteger) value); + } + break; + case BOOLEAN: + final String stringValue = value.toString(); + if ("true".equalsIgnoreCase(stringValue)) { + generator.writeBoolean(true); + } else if ("false".equalsIgnoreCase(stringValue)) { + generator.writeBoolean(false); + } else { + generator.writeString(stringValue); + } + break; + case RECORD: { + final Record record = (Record) value; + writeRecord(record, generator, gen -> gen.writeStartObject(), gen -> gen.writeEndObject()); + break; + } + case ARRAY: + default: + if ("null".equals(value.toString())) { + generator.writeNull(); + } else if (value instanceof Map) { + final Map<?, ?> map = (Map<?, ?>) value; + generator.writeStartObject(); + + int i = 0; + for (final Map.Entry<?, ?> entry : map.entrySet()) { + generator.writeFieldName(entry.getKey().toString()); + final boolean moreEntries = ++i < map.size(); + writeValue(generator, entry.getValue(), getColType(entry.getValue()), moreEntries); + } + generator.writeEndObject(); + } else if (value instanceof List) { + final List<?> list = (List<?>) value; + writeArray(list.toArray(), generator); + } else if (value instanceof Array) { + final Array array = (Array) value; + final Object[] values = (Object[]) array.getArray(); + writeArray(values, generator); + } else if (value instanceof Object[]) { + final Object[] values = (Object[]) value; + writeArray(values, generator); + } else { + generator.writeString(value.toString()); + } + break; + } + } + + private void writeArray(final Object[] values, final JsonGenerator generator) throws JsonGenerationException, IOException, SQLException { + generator.writeStartArray(); + for (int i = 0; i < values.length; i++) { + final boolean moreEntries = i < values.length - 1; + final Object element = values[i]; + writeValue(generator, element, getColType(element), moreEntries); + } + generator.writeEndArray(); + } + + private DataType getColType(final Object value) { + if (value instanceof String) { + return RecordFieldType.STRING.getDataType(); + } + if (value instanceof Double) { + return RecordFieldType.DOUBLE.getDataType(); + } + if (value instanceof Float) { + return RecordFieldType.FLOAT.getDataType(); + } + if (value instanceof Integer) { + return RecordFieldType.INT.getDataType(); + } + if (value instanceof Long) { + return RecordFieldType.LONG.getDataType(); + } + if (value instanceof BigInteger) { + return RecordFieldType.BIGINT.getDataType(); + } + if (value instanceof Boolean) { + return RecordFieldType.BOOLEAN.getDataType(); + } + if (value instanceof Byte || value instanceof Short) { + return RecordFieldType.INT.getDataType(); + } + if (value instanceof Character) { + return RecordFieldType.STRING.getDataType(); + } + if (value instanceof java.util.Date || value instanceof java.sql.Date) { + return RecordFieldType.DATE.getDataType(); + } + if (value instanceof java.sql.Time) { + return RecordFieldType.TIME.getDataType(); + } + if (value instanceof java.sql.Timestamp) { + return RecordFieldType.TIMESTAMP.getDataType(); + } + if (value instanceof Object[] || value instanceof List || value instanceof Array) { + return RecordFieldType.ARRAY.getDataType(); + } + + return RecordFieldType.RECORD.getDataType(); + } + + @Override + public String getMimeType() { + return "application/json"; + } + + private static interface GeneratorTask { + void apply(JsonGenerator generator) throws JsonGenerationException, IOException; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java new file mode 100644 index 0000000..b58a22e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.util.Arrays; +import java.util.List; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.serialization.record.RecordFieldType; + +public abstract class AbstractRecordSetWriter extends AbstractControllerService { + static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder() + .name("Date Format") + .description("Specifies the format to use when writing out Date fields") + .expressionLanguageSupported(false) + .defaultValue(RecordFieldType.DATE.getDefaultFormat()) + .addValidator(new SimpleDateFormatValidator()) + .required(true) + .build(); + + static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder() + .name("Time Format") + .description("Specifies the format to use when writing out Time fields") + .expressionLanguageSupported(false) + .defaultValue(RecordFieldType.TIME.getDefaultFormat()) + .addValidator(new SimpleDateFormatValidator()) + .required(true) + .build(); + + static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder() + .name("Timestamp Format") + .description("Specifies the format to use when writing out Timestamp (date/time) fields") + .expressionLanguageSupported(false) + .defaultValue(RecordFieldType.TIMESTAMP.getDefaultFormat()) + .addValidator(new SimpleDateFormatValidator()) + .required(true) + .build(); + + private volatile String dateFormat; + private volatile String timeFormat; + private volatile String timestampFormat; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return Arrays.asList(DATE_FORMAT, TIME_FORMAT, TIMESTAMP_FORMAT); + } + + @OnEnabled + public void captureValues(final ConfigurationContext context) { + this.dateFormat = context.getProperty(DATE_FORMAT).getValue(); + this.timeFormat = context.getProperty(TIME_FORMAT).getValue(); + this.timestampFormat = context.getProperty(TIMESTAMP_FORMAT).getValue(); + } + + protected String getDateFormat() { + return dateFormat; + } + + protected String getTimeFormat() { + return timeFormat; + } + + protected String getTimestampFormat() { + return timestampFormat; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DataTypeUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DataTypeUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DataTypeUtils.java new file mode 100644 index 0000000..de207f4 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/DataTypeUtils.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +public class DataTypeUtils { + + public static Double toDouble(final Object value, final Double defaultValue) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } + + if (value instanceof String) { + return Double.parseDouble((String) value); + } + + return defaultValue; + } + + public static Float toFloat(final Object value, final Float defaultValue) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).floatValue(); + } + + if (value instanceof String) { + return Float.parseFloat((String) value); + } + + return defaultValue; + } + + public static Long toLong(final Object value, final Long defaultValue) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).longValue(); + } + + if (value instanceof String) { + return Long.parseLong((String) value); + } + + return defaultValue; + } + + + + public static Integer toInteger(final Object value, final Integer defaultValue) { + if (value == null) { + return null; + } + + if (value instanceof Number) { + return ((Number) value).intValue(); + } + + if (value instanceof String) { + return Integer.parseInt((String) value); + } + + return defaultValue; + } + + + /** + * Deduces the type of RecordFieldType that should be used for a value of the given type, + * or returns <code>null</code> if the value is null + * + * @param value the value whose type should be deduced + * @return the type of RecordFieldType that should be used for a value of the given type, + * or <code>null</code> if the value is null + */ + public static DataType inferDataType(final Object value) { + if (value == null) { + return null; + } + + if (value instanceof String) { + return RecordFieldType.STRING.getDataType(); + } + if (value instanceof Long) { + return RecordFieldType.LONG.getDataType(); + } + if (value instanceof Integer) { + return RecordFieldType.INT.getDataType(); + } + if (value instanceof Double) { + return RecordFieldType.DOUBLE.getDataType(); + } + if (value instanceof Float) { + return RecordFieldType.FLOAT.getDataType(); + } + if (value instanceof Boolean) { + return RecordFieldType.BOOLEAN.getDataType(); + } + if (value instanceof Byte) { + return RecordFieldType.BYTE.getDataType(); + } + if (value instanceof Character) { + return RecordFieldType.CHAR.getDataType(); + } + if (value instanceof Short) { + return RecordFieldType.SHORT.getDataType(); + } + if (value instanceof Date) { + return RecordFieldType.DATE.getDataType(); + } + if (value instanceof Object[] || value instanceof List) { + return RecordFieldType.ARRAY.getDataType(); + } + if (value instanceof Map) { + @SuppressWarnings("unchecked") + final Map<String, Object> map = (Map<String, Object>) value; + final RecordSchema childSchema = determineSchema(map); + return RecordFieldType.RECORD.getDataType(childSchema); + } + + return RecordFieldType.RECORD.getDataType(); + } + + public static RecordSchema determineSchema(final Map<String, Object> valueMap) { + final List<RecordField> fields = new ArrayList<>(valueMap.size()); + for (final Map.Entry<String, Object> entry : valueMap.entrySet()) { + final DataType valueType = inferDataType(entry.getValue()); + final String fieldName = entry.getKey(); + final RecordField field = new RecordField(fieldName, valueType); + fields.add(field); + } + return new SimpleRecordSchema(fields); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java new file mode 100644 index 0000000..f25749b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SimpleDateFormatValidator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.text.SimpleDateFormat; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; + +public class SimpleDateFormatValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + try { + new SimpleDateFormat(input); + } catch (final Exception e) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Invalid Date format: " + e.getMessage()) + .build(); + } + + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .build(); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/UserTypeOverrideRowReader.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/UserTypeOverrideRowReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/UserTypeOverrideRowReader.java new file mode 100644 index 0000000..be0b8ad --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/UserTypeOverrideRowReader.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.serialization; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.serialization.DataTypeValidator; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordFieldType; + +public abstract class UserTypeOverrideRowReader extends AbstractControllerService { + private volatile Map<String, DataType> fieldTypeOverrides; + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .dynamic(true) + .addValidator(new DataTypeValidator()) + .build(); + } + + @OnEnabled + public void createFieldTypeOverrides(final ConfigurationContext context) { + final Map<String, DataType> overrides = new HashMap<>(context.getProperties().size()); + for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { + if (!entry.getKey().isDynamic()) { + continue; + } + + final String fieldName = entry.getKey().getName(); + final String dataTypeName = entry.getValue(); + if (dataTypeName == null) { + continue; + } + + final DataType dataType; + final String[] splits = dataTypeName.split("\\:"); + if (splits.length == 2) { + final RecordFieldType fieldType = RecordFieldType.of(splits[0]); + final String format = splits[1]; + dataType = fieldType.getDataType(format); + } else { + final RecordFieldType fieldType = RecordFieldType.of(dataTypeName); + dataType = fieldType.getDataType(); + } + + overrides.put(fieldName, dataType); + } + + this.fieldTypeOverrides = Collections.unmodifiableMap(overrides); + } + + protected Map<String, DataType> getFieldTypeOverrides() { + return fieldTypeOverrides; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java new file mode 100644 index 0000000..07da00e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/text/FreeFormTextRecordSetWriter.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.text; + +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +@Tags({"text", "freeform", "expression", "language", "el", "resultset", "writer", "serialize"}) +@CapabilityDescription("Writes the contents of a Database ResultSet as free-form text. The configured " + + "text is able to make use of the Expression Language to reference each of the columns that are available " + + "in the ResultSet. Each record in the ResultSet will be separated by a single newline character.") +public class FreeFormTextRecordSetWriter extends AbstractControllerService implements RecordSetWriterFactory { + static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder() + .name("Text") + .description("The text to use when writing the results. This property will evaluate the Expression Language using any of the columns available to the Result Set. For example, if the " + + "following SQL Query is used: \"SELECT Name, COUNT(*) AS Count\" then the Expression can reference \"Name\" and \"Count\", such as \"${Name:toUpper()} ${Count:minus(1)}\"") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(true) + .build(); + static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() + .name("Character Set") + .description("The Character set to use when writing the data to the FlowFile") + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .expressionLanguageSupported(false) + .required(true) + .build(); + + private volatile PropertyValue textValue; + private volatile Charset characterSet; + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(TEXT); + properties.add(CHARACTER_SET); + return properties; + } + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + textValue = context.getProperty(TEXT); + characterSet = Charset.forName(context.getProperty(CHARACTER_SET).getValue()); + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger) { + return new FreeFormTextWriter(textValue, characterSet); + } + +}