[ 
https://issues.apache.org/jira/browse/NIFI-751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14619004#comment-14619004
 ] 

ASF GitHub Bot commented on NIFI-751:
-------------------------------------

Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/incubator-nifi/pull/70#discussion_r34175017
  
    --- Diff: 
nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
 ---
    @@ -0,0 +1,252 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.nifi.processors.kite;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.List;
    +import java.util.Set;
    +
    +import org.apache.avro.Schema;
    +import org.apache.avro.file.CodecFactory;
    +import org.apache.avro.file.DataFileStream;
    +import org.apache.avro.file.DataFileWriter;
    +import org.apache.avro.generic.GenericData.Record;
    +import org.apache.avro.generic.GenericDatumReader;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.StreamCallback;
    +import 
org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException;
    +import org.apache.nifi.util.LongHolder;
    +import org.codehaus.jackson.JsonNode;
    +import org.kitesdk.data.DatasetException;
    +import org.kitesdk.data.DatasetIOException;
    +import org.kitesdk.data.SchemaNotFoundException;
    +import org.kitesdk.data.spi.DefaultConfiguration;
    +
    +import com.google.common.annotations.VisibleForTesting;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +
    +@Tags({ "kite", "avro" })
    +@CapabilityDescription("Convert records from one Avro schema to another, 
including support for flattening and simple type conversions")
    +public class ConvertAvroSchema extends AbstractKiteProcessor {
    +
    +   private static final Relationship SUCCESS = new Relationship.Builder()
    +                   .name("success")
    +                   .description("Avro content that was converted 
successfully")
    +                   .build();
    +
    +   private static final Relationship FAILURE = new Relationship.Builder()
    +                   .name("failure")
    +                   .description(
    +                                   "Avro content that could not be 
converterted to output schema")
    +                   .build();
    +
    +   private static final Relationship INCOMPATIBLE = new 
Relationship.Builder()
    +                   .name("incompatible")
    +                   .description(
    +                                   "Avro content that could not be read 
with input schema")
    +                   .build();
    +
    +   @VisibleForTesting
    +   static final PropertyDescriptor INPUT_SCHEMA = new 
PropertyDescriptor.Builder()
    +                   .name("Input Schema").description("Avro Schema of Input 
Flowfiles")
    +                   
.addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(true)
    +                   .required(true).build();
    +
    +   @VisibleForTesting
    +   static final PropertyDescriptor OUTPUT_SCHEMA = new 
PropertyDescriptor.Builder()
    +                   .name("Output Schema")
    +                   .description("Avro Schema of Output Flowfiles")
    +                   
.addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(true)
    +                   .required(true).build();
    +
    +   @VisibleForTesting
    +   static final PropertyDescriptor FIELD_MAPPING = new 
PropertyDescriptor.Builder()
    +                   .name("Field Mapping")
    +                   .description(
    +                                   "Field Mapping Between Schemas. Format 
is a JSON array of maps with properties "
    +                                                   + "source (name of 
column in input record, including . notation to step into "
    +                                                   + "records) and target 
(name of column in output record). For fields not "
    +                                                   + "found in this 
mapping, the processor tries to match names from the "
    +                                                   + "input to the output 
record.")
    +                   
.addValidator(JSON_VALIDATOR).defaultValue("[]").build();
    +
    +   private static final List<PropertyDescriptor> PROPERTIES = ImmutableList
    +                   .<PropertyDescriptor> builder()
    +                   
.addAll(AbstractKiteProcessor.getProperties()).add(INPUT_SCHEMA)
    +                   .add(OUTPUT_SCHEMA).add(FIELD_MAPPING).build();
    +
    +   private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
    +                   .<Relationship> builder().add(SUCCESS).add(FAILURE)
    +                   .add(INCOMPATIBLE).build();
    +
    +   @Override
    +   protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +           return PROPERTIES;
    +   }
    +
    +   @Override
    +   public Set<Relationship> getRelationships() {
    +           return RELATIONSHIPS;
    +   }
    +
    +   @Override
    +   public void onTrigger(ProcessContext context, final ProcessSession 
session)
    +                   throws ProcessException {
    +           FlowFile incomingAvro = session.get();
    +           if (incomingAvro == null) {
    +                   return;
    +           }
    +
    +           String inputSchemaProperty = context.getProperty(INPUT_SCHEMA)
    +                           
.evaluateAttributeExpressions(incomingAvro).getValue();
    +           final Schema inputSchema;
    +           try {
    +                   inputSchema = getSchema(inputSchemaProperty,
    +                                   DefaultConfiguration.get());
    +           } catch (SchemaNotFoundException e) {
    +                   getLogger().error("Cannot find schema: " + 
inputSchemaProperty);
    +                   session.transfer(incomingAvro, FAILURE);
    +                   return;
    +           }
    +           String outputSchemaProperty = context.getProperty(OUTPUT_SCHEMA)
    +                           
.evaluateAttributeExpressions(incomingAvro).getValue();
    +           final Schema outputSchema;
    +           try {
    +                   outputSchema = getSchema(outputSchemaProperty,
    +                                   DefaultConfiguration.get());
    +           } catch (SchemaNotFoundException e) {
    +                   getLogger().error("Cannot find schema: " + 
outputSchemaProperty);
    +                   session.transfer(incomingAvro, FAILURE);
    +                   return;
    +           }
    +           String fieldMappingProperty = context.getProperty(FIELD_MAPPING)
    +                           
.evaluateAttributeExpressions(incomingAvro).getValue();
    +           final JsonNode fieldMapping;
    +           try {
    +                   fieldMapping = getJSON(fieldMappingProperty,
    +                                   DefaultConfiguration.get());
    +           } catch (IllegalArgumentException e) {
    +                   getLogger().error(
    +                                   "Cannot find field mapping: " + 
fieldMappingProperty);
    +                   session.transfer(incomingAvro, FAILURE);
    +                   return;
    +           }
    +           final AvroRecordConverter converter = new AvroRecordConverter(
    +                           inputSchema, outputSchema, fieldMapping);
    +
    +           final DataFileWriter<Record> writer = new DataFileWriter<>(
    +                           AvroUtil.newDatumWriter(outputSchema, 
Record.class));
    +           writer.setCodec(CodecFactory.snappyCodec());
    +
    +           try {
    +                   final LongHolder written = new LongHolder(0L);
    +                   final FailureTracker failures = new FailureTracker();
    +
    +                   FlowFile badRecords = session.clone(incomingAvro);
    +                   FlowFile outgoingAvro = session.write(incomingAvro,
    +                                   new StreamCallback() {
    +                                           @Override
    +                                           public void process(InputStream 
in, OutputStream out)
    +                                                           throws 
IOException {
    +                                                   try 
(DataFileStream<Record> stream = new DataFileStream<Record>(
    +                                                                   in, new 
GenericDatumReader<Record>(
    +                                                                           
        converter.getInputSchema()))) {
    +                                                           try 
(DataFileWriter<Record> w = writer.create(
    +                                                                           
outputSchema, out)) {
    +                                                                   for 
(Record record : stream) {
    +                                                                           
try {
    +                                                                           
        Record converted = converter
    +                                                                           
                        .convert(record);
    +                                                                           
        w.append(converted);
    +                                                                           
        written.incrementAndGet();
    +                                                                           
} catch (AvroConversionException e) {
    +                                                                           
        failures.add(e);
    +                                                                           
        getLogger().error(
    +                                                                           
                        "Error converting data: "
    +                                                                           
                                        + e.getMessage());
    +                                                                           
}
    +                                                                   }
    +                                                           }
    +                                                   }
    +                                           }
    +                                   });
    +
    +                   long errors = failures.count();
    +
    +                   // update only if file transfer is successful
    +                   session.adjustCounter("Converted records", 
written.get(), false);
    +                   // update only if file transfer is successful
    +                   session.adjustCounter("Conversion errors", errors, 
false);
    +
    +                   if (written.get() > 0L) {
    +                           session.transfer(outgoingAvro, SUCCESS);
    +
    +                           if (errors > 0L) {
    +                                   getLogger().warn(
    +                                                   "Failed to convert 
{}/{} records from CSV to Avro",
    +                                                   new Object[] { errors, 
errors + written.get() });
    +                                   badRecords = 
session.putAttribute(badRecords, "errors",
    +                                                   failures.summary());
    +                                   session.transfer(badRecords, 
INCOMPATIBLE);
    +                           } else {
    +                                   session.remove(badRecords);
    +                           }
    +
    +                   } else {
    +                           session.remove(outgoingAvro);
    +
    +                           if (errors > 0L) {
    +                                   getLogger().warn(
    +                                                   "Failed to convert 
{}/{} records from CSV to Avro",
    +                                                   new Object[] { errors, 
errors });
    +                                   badRecords = 
session.putAttribute(badRecords, "errors",
    +                                                   failures.summary());
    +                           } else {
    +                                   badRecords = 
session.putAttribute(badRecords, "errors",
    +                                                   "No incoming records");
    +                           }
    +
    +                           session.transfer(badRecords, FAILURE);
    +                   }
    +
    +           } catch (ProcessException | DatasetIOException e) {
    +                   getLogger().error("Failed reading or writing", e);
    +                   session.transfer(incomingAvro, FAILURE);
    +           } catch (DatasetException e) {
    +                   getLogger().error("Failed to read FlowFile", e);
    +                   session.transfer(incomingAvro, FAILURE);
    +           } finally {
    +                   try {
    +                           writer.close();
    --- End diff --
    
    I think this is handled by the try-with-resources block in the 
`StreamCallback`. The writer should be closed correctly after that so there is 
no need to do it here.


> Add Processor To Convert Avro Formats
> -------------------------------------
>
>                 Key: NIFI-751
>                 URL: https://issues.apache.org/jira/browse/NIFI-751
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 0.1.0
>            Reporter: Alan Jackoway
>
> When working with data from external sources, such as complex WSDL, I 
> frequently wind up with complex nested data that is difficult to work with 
> even when converted to Avro format. Specifically, I often have two needs:
> * Converting types of data, usually from string to long, double, etc. when 
> APIs give only string data back.
> * Flattening data by taking fields out of nested records and putting them on 
> the top level of the Avro file.
> Unfortunately the Kite JSONToAvro processor only supports exact conversions 
> from JSON to a matching Avro schema and will not do data transformations of 
> this type. Proposed processor to come.
> Discussed this with [~rdblue], so tagging him here as I don't have permission 
> to set a CC for some reason.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to