[ 
https://issues.apache.org/jira/browse/NIFI-751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14618977#comment-14618977
 ] 

ASF GitHub Bot commented on NIFI-751:
-------------------------------------

Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/70#discussion_r34174088
  
    --- Diff: 
nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
 ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.kite;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.Schema.Field;
    +import org.apache.avro.generic.GenericData.Record;
    +import org.apache.avro.generic.IndexedRecord;
    +import org.codehaus.jackson.JsonNode;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +
    +/**
    + * Responsible for converting records of one Avro type to another. Supports
    + * syntax like "record.field" to unpack fields and will try to do simple 
type
    + * conversion.
    + */
    +public class AvroRecordConverter {
    +   private final Schema inputSchema;
    +   private final Schema outputSchema;
    +   private final Map<String, String> fieldMapping;
    +
    +   public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
    +                   JsonNode fieldMapping) {
    +           this.inputSchema = inputSchema;
    +           this.outputSchema = outputSchema;
    +           this.fieldMapping = getFieldMapping(fieldMapping);
    +   }
    +
    +   /**
    +    * Converts one record to another given a input and output schema plus
    +    * explicit mappings for certain target fields.
    +    * 
    +    * @param record
    +    * @param inputSchema
    +    * @param outputSchema
    +    * @param fieldMapping
    +    * @return
    +    * @throws AvroConversionException
    +    */
    +   public Record convert(Record input) throws AvroConversionException {
    +           Record result = new Record(outputSchema);
    +           for (Field outputField : outputSchema.getFields()) {
    +                   String inputFieldName = outputField.name();
    +                   if (fieldMapping.containsKey(outputField.name())) {
    +                           inputFieldName = 
fieldMapping.get(outputField.name());
    +                   }
    +
    +                   List<String> fieldParts = 
Lists.newArrayList(inputFieldName
    +                                   .split("\\."));
    +                   Record current = input;
    +                   while (fieldParts.size() > 1) {
    +                           // Step into the nested records as far as 
needed.
    +                           current = (Record) 
current.get(fieldParts.remove(0));
    +                   }
    +
    +                   // Current should now be in the right place to read the 
record.
    +                   Field f = getFieldForName(inputFieldName, inputSchema);
    +                   Object content = getContentForName(input, 
inputFieldName,
    +                                   input.getSchema());
    +                   result.put(outputField.name(),
    +                                   convertData(content, f.schema(), 
outputField.schema()));
    +           }
    +           return result;
    +   }
    +
    +   /**
    +    * @return the inputSchema
    +    */
    +   public Schema getInputSchema() {
    +           return inputSchema;
    +   }
    +
    +   /**
    +    * @return the outputSchema
    +    */
    +   public Schema getOutputSchema() {
    +           return outputSchema;
    +   }
    +
    +   /**
    +    * Converts the data from one schema to another. If the types are the 
same,
    +    * no change will be made, but simple conversions will be attempted for
    +    * other types.
    +    * 
    +    * @param content
    +    * @param inputSchema
    +    * @param outputSchema
    +    * @return
    +    * @throws AvroConversionException
    +    */
    +   private Object convertData(Object content, Schema inputSchema,
    +                   Schema outputSchema) throws AvroConversionException {
    +           if (content == null) {
    +                   // No conversion can happen here.
    +                   return null;
    +           }
    +
    +           Schema nonNillInput = undoNillableSchema(inputSchema);
    +           Schema nonNillOutput = undoNillableSchema(outputSchema);
    +           if (nonNillInput.getType().equals(nonNillOutput.getType())) {
    +                   return content;
    +           } else {
    +                   switch (nonNillOutput.getType()) {
    +                   case STRING:
    +                           // This is the easiest conversion case. 
Converting to string we
    +                           // assume
    +                           // that String.valueOf knows what to do.
    +                           return String.valueOf(content);
    +
    +                           // For the rest of these, we will try to 
convert through string
    +                           // which
    +                           // isn't super-efficient but should work.
    +                   case LONG:
    +                           try {
    +                                   return 
Long.parseLong(String.valueOf(content));
    +                           } catch (NumberFormatException e) {
    +                                   throw new 
AvroConversionException("Cannot convert "
    +                                                   + content + " to long");
    +                           }
    +                   case INT:
    +                           try {
    +                                   return 
Integer.parseInt(String.valueOf(content));
    +                           } catch (NumberFormatException e) {
    +                                   throw new 
AvroConversionException("Cannot convert "
    +                                                   + content + " to int");
    +                           }
    +                   case DOUBLE:
    +                           try {
    +                                   return 
Double.parseDouble(String.valueOf(content));
    +                           } catch (NumberFormatException e) {
    +                                   throw new 
AvroConversionException("Cannot convert "
    +                                                   + content + " to 
double");
    +                           }
    +                   case FLOAT:
    +                           try {
    +                                   return 
Float.parseFloat(String.valueOf(content));
    +                           } catch (NumberFormatException e) {
    +                                   throw new 
AvroConversionException("Cannot convert "
    +                                                   + content + " to 
float");
    +                           }
    +                   default:
    +                           throw new AvroConversionException("Cannot 
convert to type "
    +                                           + nonNillOutput.getType());
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * Get a mapping from output column to input column definition.
    +    * 
    +    * @param mappingStr
    +    * @return
    +    * @throws AvroConversionException
    +    */
    +   private Map<String, String> getFieldMapping(JsonNode listNode) {
    +           Map<String, String> result = Maps.newHashMap();
    +           for (JsonNode mappingNode : listNode) {
    +                   result.put(mappingNode.get("target").getTextValue(), 
mappingNode
    +                                   .get("source").getTextValue());
    +           }
    +
    +           return result;
    +   }
    +
    +   /**
    +    * Gets the field for a given name in a schema, handling . notation to 
step
    +    * into records.
    +    * 
    +    * @param fieldName
    +    * @param s
    +    * @return
    +    */
    +   private static Field getFieldForName(String fieldName, Schema s) {
    +           while (fieldName.contains(".")) {
    +                   // Recurse down the schema to find the right field.
    +                   int dotIndex = fieldName.indexOf('.');
    +                   String entityName = fieldName.substring(0, dotIndex);
    +                   s = undoNillableSchema(s);
    +                   s = s.getField(entityName).schema();
    +                   fieldName = fieldName.substring(dotIndex + 1);
    +           }
    +           s = undoNillableSchema(s);
    +           return s.getField(fieldName);
    +   }
    +
    +   protected Object getContentForName(Record entity, String fieldName, 
Schema s) {
    +           IndexedRecord currentRecord = entity;
    +           while (fieldName.contains(".")) {
    +                   // Recurse down the schema to find the right field.
    +                   int dotIndex = fieldName.indexOf('.');
    +                   String entityName = fieldName.substring(0, dotIndex);
    +                   Object innerRecord = currentRecord
    +                                   .get(s.getField(entityName).pos());
    +                   // This may have been a null union type. Bail out now.
    +                   if (innerRecord == null) {
    +                           return null;
    +                   }
    +                   Preconditions.checkArgument(innerRecord instanceof 
IndexedRecord,
    +                                   "Cannot access subfield " + fieldName
    +                                                   + " because it is not a 
record");
    +                   currentRecord = (IndexedRecord) innerRecord;
    +                   s = undoNillableSchema(s);
    +                   s = s.getField(entityName).schema();
    +                   fieldName = fieldName.substring(dotIndex + 1);
    +           }
    +
    +           s = undoNillableSchema(s);
    +           return currentRecord.get(s.getField(fieldName).pos());
    +   }
    +
    +   /**
    +    * If s is a union schema of some type with null, returns that type.
    +    * Otherwise just return schema itself.
    +    * 
    +    * Does not handle unions of schemas with anything except null and one 
type.
    +    * 
    +    * @param s
    +    * @return
    +    */
    +   protected static Schema undoNillableSchema(Schema s) {
    +           // Handle the case where s is a union type. Assert that this 
must be a
    +           // union
    +           // that only includes one record/map
    +           if (s.getType() == Schema.Type.UNION) {
    +                   List<Schema> types = s.getTypes();
    +                   boolean foundOne = false;
    +                   Schema result = s;
    +                   for (Schema type : types) {
    +                           if (!type.getType().equals(Schema.Type.NULL)) {
    +                                   Preconditions.checkArgument(foundOne == 
false,
    +                                                   "Cannot handle union of 
two non-null types");
    +                                   foundOne = true;
    +                                   result = type;
    +                           }
    +                   }
    +                   return result;
    +           } else {
    +                   return s;
    +           }
    +   }
    +
    +   /**
    +    * Exception thrown when Avro conversion fails.
    +    */
    +   public class AvroConversionException extends Exception {
    +           private static final long serialVersionUID = 1L;
    --- End diff --
    
    Is it necessary to have a serialVersionUID? Seems useless, especially if it 
is set to 1.


> Add Processor To Convert Avro Formats
> -------------------------------------
>
>                 Key: NIFI-751
>                 URL: https://issues.apache.org/jira/browse/NIFI-751
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 0.1.0
>            Reporter: Alan Jackoway
>
> When working with data from external sources, such as complex WSDL, I 
> frequently wind up with complex nested data that is difficult to work with 
> even when converted to Avro format. Specifically, I often have two needs:
> * Converting types of data, usually from string to long, double, etc. when 
> APIs give only string data back.
> * Flattening data by taking fields out of nested records and putting them on 
> the top level of the Avro file.
> Unfortunately the Kite JSONToAvro processor only supports exact conversions 
> from JSON to a matching Avro schema and will not do data transformations of 
> this type. Proposed processor to come.
> Discussed this with [~rdblue], so tagging him here as I don't have permission 
> to set a CC for some reason.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to