[ https://issues.apache.org/jira/browse/NIFI-751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14618977#comment-14618977 ]
ASF GitHub Bot commented on NIFI-751: ------------------------------------- Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-nifi/pull/70#discussion_r34174088 --- Diff: nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.IndexedRecord; +import org.codehaus.jackson.JsonNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * Responsible for converting records of one Avro type to another. Supports + * syntax like "record.field" to unpack fields and will try to do simple type + * conversion. + */ +public class AvroRecordConverter { + private final Schema inputSchema; + private final Schema outputSchema; + private final Map<String, String> fieldMapping; + + public AvroRecordConverter(Schema inputSchema, Schema outputSchema, + JsonNode fieldMapping) { + this.inputSchema = inputSchema; + this.outputSchema = outputSchema; + this.fieldMapping = getFieldMapping(fieldMapping); + } + + /** + * Converts one record to another given a input and output schema plus + * explicit mappings for certain target fields. + * + * @param record + * @param inputSchema + * @param outputSchema + * @param fieldMapping + * @return + * @throws AvroConversionException + */ + public Record convert(Record input) throws AvroConversionException { + Record result = new Record(outputSchema); + for (Field outputField : outputSchema.getFields()) { + String inputFieldName = outputField.name(); + if (fieldMapping.containsKey(outputField.name())) { + inputFieldName = fieldMapping.get(outputField.name()); + } + + List<String> fieldParts = Lists.newArrayList(inputFieldName + .split("\\.")); + Record current = input; + while (fieldParts.size() > 1) { + // Step into the nested records as far as needed. + current = (Record) current.get(fieldParts.remove(0)); + } + + // Current should now be in the right place to read the record. + Field f = getFieldForName(inputFieldName, inputSchema); + Object content = getContentForName(input, inputFieldName, + input.getSchema()); + result.put(outputField.name(), + convertData(content, f.schema(), outputField.schema())); + } + return result; + } + + /** + * @return the inputSchema + */ + public Schema getInputSchema() { + return inputSchema; + } + + /** + * @return the outputSchema + */ + public Schema getOutputSchema() { + return outputSchema; + } + + /** + * Converts the data from one schema to another. If the types are the same, + * no change will be made, but simple conversions will be attempted for + * other types. + * + * @param content + * @param inputSchema + * @param outputSchema + * @return + * @throws AvroConversionException + */ + private Object convertData(Object content, Schema inputSchema, + Schema outputSchema) throws AvroConversionException { + if (content == null) { + // No conversion can happen here. + return null; + } + + Schema nonNillInput = undoNillableSchema(inputSchema); + Schema nonNillOutput = undoNillableSchema(outputSchema); + if (nonNillInput.getType().equals(nonNillOutput.getType())) { + return content; + } else { + switch (nonNillOutput.getType()) { + case STRING: + // This is the easiest conversion case. Converting to string we + // assume + // that String.valueOf knows what to do. + return String.valueOf(content); + + // For the rest of these, we will try to convert through string + // which + // isn't super-efficient but should work. + case LONG: + try { + return Long.parseLong(String.valueOf(content)); + } catch (NumberFormatException e) { + throw new AvroConversionException("Cannot convert " + + content + " to long"); + } + case INT: + try { + return Integer.parseInt(String.valueOf(content)); + } catch (NumberFormatException e) { + throw new AvroConversionException("Cannot convert " + + content + " to int"); + } + case DOUBLE: + try { + return Double.parseDouble(String.valueOf(content)); + } catch (NumberFormatException e) { + throw new AvroConversionException("Cannot convert " + + content + " to double"); + } + case FLOAT: + try { + return Float.parseFloat(String.valueOf(content)); + } catch (NumberFormatException e) { + throw new AvroConversionException("Cannot convert " + + content + " to float"); + } + default: + throw new AvroConversionException("Cannot convert to type " + + nonNillOutput.getType()); + } + } + } + + /** + * Get a mapping from output column to input column definition. + * + * @param mappingStr + * @return + * @throws AvroConversionException + */ + private Map<String, String> getFieldMapping(JsonNode listNode) { + Map<String, String> result = Maps.newHashMap(); + for (JsonNode mappingNode : listNode) { + result.put(mappingNode.get("target").getTextValue(), mappingNode + .get("source").getTextValue()); + } + + return result; + } + + /** + * Gets the field for a given name in a schema, handling . notation to step + * into records. + * + * @param fieldName + * @param s + * @return + */ + private static Field getFieldForName(String fieldName, Schema s) { + while (fieldName.contains(".")) { + // Recurse down the schema to find the right field. + int dotIndex = fieldName.indexOf('.'); + String entityName = fieldName.substring(0, dotIndex); + s = undoNillableSchema(s); + s = s.getField(entityName).schema(); + fieldName = fieldName.substring(dotIndex + 1); + } + s = undoNillableSchema(s); + return s.getField(fieldName); + } + + protected Object getContentForName(Record entity, String fieldName, Schema s) { + IndexedRecord currentRecord = entity; + while (fieldName.contains(".")) { + // Recurse down the schema to find the right field. + int dotIndex = fieldName.indexOf('.'); + String entityName = fieldName.substring(0, dotIndex); + Object innerRecord = currentRecord + .get(s.getField(entityName).pos()); + // This may have been a null union type. Bail out now. + if (innerRecord == null) { + return null; + } + Preconditions.checkArgument(innerRecord instanceof IndexedRecord, + "Cannot access subfield " + fieldName + + " because it is not a record"); + currentRecord = (IndexedRecord) innerRecord; + s = undoNillableSchema(s); + s = s.getField(entityName).schema(); + fieldName = fieldName.substring(dotIndex + 1); + } + + s = undoNillableSchema(s); + return currentRecord.get(s.getField(fieldName).pos()); + } + + /** + * If s is a union schema of some type with null, returns that type. + * Otherwise just return schema itself. + * + * Does not handle unions of schemas with anything except null and one type. + * + * @param s + * @return + */ + protected static Schema undoNillableSchema(Schema s) { + // Handle the case where s is a union type. Assert that this must be a + // union + // that only includes one record/map + if (s.getType() == Schema.Type.UNION) { + List<Schema> types = s.getTypes(); + boolean foundOne = false; + Schema result = s; + for (Schema type : types) { + if (!type.getType().equals(Schema.Type.NULL)) { + Preconditions.checkArgument(foundOne == false, + "Cannot handle union of two non-null types"); + foundOne = true; + result = type; + } + } + return result; + } else { + return s; + } + } + + /** + * Exception thrown when Avro conversion fails. + */ + public class AvroConversionException extends Exception { + private static final long serialVersionUID = 1L; --- End diff -- Is it necessary to have a serialVersionUID? Seems useless, especially if it is set to 1. > Add Processor To Convert Avro Formats > ------------------------------------- > > Key: NIFI-751 > URL: https://issues.apache.org/jira/browse/NIFI-751 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Affects Versions: 0.1.0 > Reporter: Alan Jackoway > > When working with data from external sources, such as complex WSDL, I > frequently wind up with complex nested data that is difficult to work with > even when converted to Avro format. Specifically, I often have two needs: > * Converting types of data, usually from string to long, double, etc. when > APIs give only string data back. > * Flattening data by taking fields out of nested records and putting them on > the top level of the Avro file. > Unfortunately the Kite JSONToAvro processor only supports exact conversions > from JSON to a matching Avro schema and will not do data transformations of > this type. Proposed processor to come. > Discussed this with [~rdblue], so tagging him here as I don't have permission > to set a CC for some reason. -- This message was sent by Atlassian JIRA (v6.3.4#6332)