[ 
https://issues.apache.org/jira/browse/NIFI-751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14618959#comment-14618959
 ] 

ASF GitHub Bot commented on NIFI-751:
-------------------------------------

Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/70#discussion_r34173083
  
    --- Diff: 
nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
 ---
    @@ -0,0 +1,279 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.kite;
    +
    +import java.io.IOException;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.Schema.Field;
    +import org.apache.avro.generic.GenericData.Record;
    +import org.apache.avro.generic.IndexedRecord;
    +import org.codehaus.jackson.JsonNode;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +
    +/**
    + * Responsible for converting records of one Avro type to another. Supports
    + * syntax like "record.field" to unpack fields and will try to do simple 
type
    + * conversion.
    + */
    +public class AvroRecordConverter {
    +   private final Schema inputSchema;
    +   private final Schema outputSchema;
    +   private final Map<String, String> fieldMapping;
    +
    +   public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
    +                   JsonNode fieldMapping) {
    +           this.inputSchema = inputSchema;
    +           this.outputSchema = outputSchema;
    +           this.fieldMapping = getFieldMapping(fieldMapping);
    +   }
    +
    +   /**
    +    * Converts one record to another given a input and output schema plus
    +    * explicit mappings for certain target fields.
    +    * 
    +    * @param record
    +    * @param inputSchema
    +    * @param outputSchema
    +    * @param fieldMapping
    +    * @return
    +    * @throws AvroConversionException
    +    */
    +   public Record convert(Record input) throws AvroConversionException {
    +           Record result = new Record(outputSchema);
    +           for (Field outputField : outputSchema.getFields()) {
    +                   String inputFieldName = outputField.name();
    +                   if (fieldMapping.containsKey(outputField.name())) {
    +                           inputFieldName = 
fieldMapping.get(outputField.name());
    +                   }
    +
    +                   List<String> fieldParts = 
Lists.newArrayList(inputFieldName
    +                                   .split("\\."));
    +                   Record current = input;
    +                   while (fieldParts.size() > 1) {
    +                           // Step into the nested records as far as 
needed.
    +                           current = (Record) 
current.get(fieldParts.remove(0));
    +                   }
    +
    +                   // Current should now be in the right place to read the 
record.
    +                   Field f = getFieldForName(inputFieldName, inputSchema);
    +                   Object content = getContentForName(input, 
inputFieldName,
    +                                   input.getSchema());
    +                   result.put(outputField.name(),
    +                                   convertData(content, f.schema(), 
outputField.schema()));
    +           }
    +           return result;
    +   }
    +
    +   /**
    +    * @return the inputSchema
    +    */
    +   public Schema getInputSchema() {
    +           return inputSchema;
    +   }
    +
    +   /**
    +    * @return the outputSchema
    +    */
    +   public Schema getOutputSchema() {
    +           return outputSchema;
    +   }
    +
    +   /**
    +    * Converts the data from one schema to another. If the types are the 
same,
    +    * no change will be made, but simple conversions will be attempted for
    +    * other types.
    +    * 
    +    * @param content
    +    * @param inputSchema
    +    * @param outputSchema
    +    * @return
    +    * @throws AvroConversionException
    +    */
    +   private Object convertData(Object content, Schema inputSchema,
    +                   Schema outputSchema) throws AvroConversionException {
    +           if (content == null) {
    +                   // No conversion can happen here.
    +                   return null;
    +           }
    +
    +           Schema nonNillInput = undoNillableSchema(inputSchema);
    +           Schema nonNillOutput = undoNillableSchema(outputSchema);
    +           if (nonNillInput.getType().equals(nonNillOutput.getType())) {
    +                   return content;
    +           } else {
    +                   switch (nonNillOutput.getType()) {
    +                   case STRING:
    +                           // This is the easiest conversion case. 
Converting to string we
    +                           // assume
    +                           // that String.valueOf knows what to do.
    +                           return String.valueOf(content);
    --- End diff --
    
    What's the value of using `String.valueOf` here? I use that to avoid 
`NullPointerException`, but you've already validated that content is not 
`null`, which is the right way to go because null is converted to the string 
`"null"`.


> Add Processor To Convert Avro Formats
> -------------------------------------
>
>                 Key: NIFI-751
>                 URL: https://issues.apache.org/jira/browse/NIFI-751
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 0.1.0
>            Reporter: Alan Jackoway
>
> When working with data from external sources, such as complex WSDL, I 
> frequently wind up with complex nested data that is difficult to work with 
> even when converted to Avro format. Specifically, I often have two needs:
> * Converting types of data, usually from string to long, double, etc. when 
> APIs give only string data back.
> * Flattening data by taking fields out of nested records and putting them on 
> the top level of the Avro file.
> Unfortunately the Kite JSONToAvro processor only supports exact conversions 
> from JSON to a matching Avro schema and will not do data transformations of 
> this type. Proposed processor to come.
> Discussed this with [~rdblue], so tagging him here as I don't have permission 
> to set a CC for some reason.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to