[ https://issues.apache.org/jira/browse/NIFI-751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14619004#comment-14619004 ]
ASF GitHub Bot commented on NIFI-751: ------------------------------------- Github user rdblue commented on a diff in the pull request: https://github.com/apache/incubator-nifi/pull/70#discussion_r34175017 --- Diff: nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java --- @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.nifi.processors.kite; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Set; + +import org.apache.avro.Schema; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileStream; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException; +import org.apache.nifi.util.LongHolder; +import org.codehaus.jackson.JsonNode; +import org.kitesdk.data.DatasetException; +import org.kitesdk.data.DatasetIOException; +import org.kitesdk.data.SchemaNotFoundException; +import org.kitesdk.data.spi.DefaultConfiguration; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +@Tags({ "kite", "avro" }) +@CapabilityDescription("Convert records from one Avro schema to another, including support for flattening and simple type conversions") +public class ConvertAvroSchema extends AbstractKiteProcessor { + + private static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("Avro content that was converted successfully") + .build(); + + private static final Relationship FAILURE = new Relationship.Builder() + .name("failure") + .description( + "Avro content that could not be converterted to output schema") + .build(); + + private static final Relationship INCOMPATIBLE = new Relationship.Builder() + .name("incompatible") + .description( + "Avro content that could not be read with input schema") + .build(); + + @VisibleForTesting + static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder() + .name("Input Schema").description("Avro Schema of Input Flowfiles") + .addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(true) + .required(true).build(); + + @VisibleForTesting + static final PropertyDescriptor OUTPUT_SCHEMA = new PropertyDescriptor.Builder() + .name("Output Schema") + .description("Avro Schema of Output Flowfiles") + .addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(true) + .required(true).build(); + + @VisibleForTesting + static final PropertyDescriptor FIELD_MAPPING = new PropertyDescriptor.Builder() + .name("Field Mapping") + .description( + "Field Mapping Between Schemas. Format is a JSON array of maps with properties " + + "source (name of column in input record, including . notation to step into " + + "records) and target (name of column in output record). For fields not " + + "found in this mapping, the processor tries to match names from the " + + "input to the output record.") + .addValidator(JSON_VALIDATOR).defaultValue("[]").build(); + + private static final List<PropertyDescriptor> PROPERTIES = ImmutableList + .<PropertyDescriptor> builder() + .addAll(AbstractKiteProcessor.getProperties()).add(INPUT_SCHEMA) + .add(OUTPUT_SCHEMA).add(FIELD_MAPPING).build(); + + private static final Set<Relationship> RELATIONSHIPS = ImmutableSet + .<Relationship> builder().add(SUCCESS).add(FAILURE) + .add(INCOMPATIBLE).build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public void onTrigger(ProcessContext context, final ProcessSession session) + throws ProcessException { + FlowFile incomingAvro = session.get(); + if (incomingAvro == null) { + return; + } + + String inputSchemaProperty = context.getProperty(INPUT_SCHEMA) + .evaluateAttributeExpressions(incomingAvro).getValue(); + final Schema inputSchema; + try { + inputSchema = getSchema(inputSchemaProperty, + DefaultConfiguration.get()); + } catch (SchemaNotFoundException e) { + getLogger().error("Cannot find schema: " + inputSchemaProperty); + session.transfer(incomingAvro, FAILURE); + return; + } + String outputSchemaProperty = context.getProperty(OUTPUT_SCHEMA) + .evaluateAttributeExpressions(incomingAvro).getValue(); + final Schema outputSchema; + try { + outputSchema = getSchema(outputSchemaProperty, + DefaultConfiguration.get()); + } catch (SchemaNotFoundException e) { + getLogger().error("Cannot find schema: " + outputSchemaProperty); + session.transfer(incomingAvro, FAILURE); + return; + } + String fieldMappingProperty = context.getProperty(FIELD_MAPPING) + .evaluateAttributeExpressions(incomingAvro).getValue(); + final JsonNode fieldMapping; + try { + fieldMapping = getJSON(fieldMappingProperty, + DefaultConfiguration.get()); + } catch (IllegalArgumentException e) { + getLogger().error( + "Cannot find field mapping: " + fieldMappingProperty); + session.transfer(incomingAvro, FAILURE); + return; + } + final AvroRecordConverter converter = new AvroRecordConverter( + inputSchema, outputSchema, fieldMapping); + + final DataFileWriter<Record> writer = new DataFileWriter<>( + AvroUtil.newDatumWriter(outputSchema, Record.class)); + writer.setCodec(CodecFactory.snappyCodec()); + + try { + final LongHolder written = new LongHolder(0L); + final FailureTracker failures = new FailureTracker(); + + FlowFile badRecords = session.clone(incomingAvro); + FlowFile outgoingAvro = session.write(incomingAvro, + new StreamCallback() { + @Override + public void process(InputStream in, OutputStream out) + throws IOException { + try (DataFileStream<Record> stream = new DataFileStream<Record>( + in, new GenericDatumReader<Record>( + converter.getInputSchema()))) { + try (DataFileWriter<Record> w = writer.create( + outputSchema, out)) { + for (Record record : stream) { + try { + Record converted = converter + .convert(record); + w.append(converted); + written.incrementAndGet(); + } catch (AvroConversionException e) { + failures.add(e); + getLogger().error( + "Error converting data: " + + e.getMessage()); + } + } + } + } + } + }); + + long errors = failures.count(); + + // update only if file transfer is successful + session.adjustCounter("Converted records", written.get(), false); + // update only if file transfer is successful + session.adjustCounter("Conversion errors", errors, false); + + if (written.get() > 0L) { + session.transfer(outgoingAvro, SUCCESS); + + if (errors > 0L) { + getLogger().warn( + "Failed to convert {}/{} records from CSV to Avro", + new Object[] { errors, errors + written.get() }); + badRecords = session.putAttribute(badRecords, "errors", + failures.summary()); + session.transfer(badRecords, INCOMPATIBLE); + } else { + session.remove(badRecords); + } + + } else { + session.remove(outgoingAvro); + + if (errors > 0L) { + getLogger().warn( + "Failed to convert {}/{} records from CSV to Avro", + new Object[] { errors, errors }); + badRecords = session.putAttribute(badRecords, "errors", + failures.summary()); + } else { + badRecords = session.putAttribute(badRecords, "errors", + "No incoming records"); + } + + session.transfer(badRecords, FAILURE); + } + + } catch (ProcessException | DatasetIOException e) { + getLogger().error("Failed reading or writing", e); + session.transfer(incomingAvro, FAILURE); + } catch (DatasetException e) { + getLogger().error("Failed to read FlowFile", e); + session.transfer(incomingAvro, FAILURE); + } finally { + try { + writer.close(); --- End diff -- I think this is handled by the try-with-resources block in the `StreamCallback`. The writer should be closed correctly after that so there is no need to do it here. > Add Processor To Convert Avro Formats > ------------------------------------- > > Key: NIFI-751 > URL: https://issues.apache.org/jira/browse/NIFI-751 > Project: Apache NiFi > Issue Type: Improvement > Components: Extensions > Affects Versions: 0.1.0 > Reporter: Alan Jackoway > > When working with data from external sources, such as complex WSDL, I > frequently wind up with complex nested data that is difficult to work with > even when converted to Avro format. Specifically, I often have two needs: > * Converting types of data, usually from string to long, double, etc. when > APIs give only string data back. > * Flattening data by taking fields out of nested records and putting them on > the top level of the Avro file. > Unfortunately the Kite JSONToAvro processor only supports exact conversions > from JSON to a matching Avro schema and will not do data transformations of > this type. Proposed processor to come. > Discussed this with [~rdblue], so tagging him here as I don't have permission > to set a CC for some reason. -- This message was sent by Atlassian JIRA (v6.3.4#6332)