[ https://issues.apache.org/jira/browse/NIFI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599491#comment-16599491 ]
ASF GitHub Bot commented on NIFI-5327: -------------------------------------- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2820#discussion_r214502000 --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network; + +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields; +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processors.network.parser.Netflowv5Parser; +import org.apache.nifi.stream.io.StreamUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" }) +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.") +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") }) +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."), + @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") }) + +public class ParseNetflowv5 extends AbstractProcessor { + private String destination; + // Add mapper + private static final ObjectMapper mapper = new ObjectMapper(); + + public static final String DESTINATION_CONTENT = "flowfile-content"; + public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute"; + public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination") + .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES + + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.") + .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build(); + + public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION)); + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS))); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + destination = context.getProperty(FIELDS_DESTINATION).getValue(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final OptionalInt portNumber = resolvePort(flowFile); + final Netflowv5Parser parser = new Netflowv5Parser(portNumber); + + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + final int processedRecord; + try { + processedRecord = parser.parse(buffer); + getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord }); + } catch (Throwable e) { + getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile }); + session.transfer(flowFile, REL_FAILURE); + return; + } + + try { + final List<FlowFile> multipleRecords = new ArrayList<>(); + switch (destination) { + case DESTINATION_ATTRIBUTES: + final Map<String, String> attributes = new HashMap<>(); + generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord); + break; + case DESTINATION_CONTENT: + generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer); + break; + } + // Create a provenance event recording the routing to success + multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS)); + session.getProvenanceReporter().route(flowFile, REL_ORIGINAL); + // Ready to transfer and commit + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(multipleRecords, REL_SUCCESS); + session.adjustCounter("Records Processed", processedRecord, false); + session.commit(); + } catch (Exception e) { + // The flowfile has failed parsing & validation, routing to failure + getLogger().error("Failed to parse {} as a netflowv5 message due to {}; routing to failure", new Object[] { flowFile, e }); + // Create a provenance event recording the routing to failure + session.getProvenanceReporter().route(flowFile, REL_FAILURE); + session.transfer(flowFile, REL_FAILURE); + session.commit(); + return; + } finally { + session.rollback(); + } + } + + private void generateJSON(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, final int processedRecord, final byte[] buffer) + throws JsonProcessingException { + int numberOfRecords = processedRecord; + FlowFile recordFlowFile = flowFile; + int record = 0; + while (numberOfRecords-- > 0) { + ObjectNode results = mapper.createObjectNode(); + // Add Port number and message format + results.set("port", mapper.valueToTree(parser.getPortNumber())); + results.set("format", mapper.valueToTree("netflowv5")); + + recordFlowFile = session.clone(flowFile); + // Add JSON Objects + generateJSONUtil(results, parser, record++); + + recordFlowFile = session.write(recordFlowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + try (OutputStream outputStream = new BufferedOutputStream(out)) { + outputStream.write(mapper.writeValueAsBytes(results)); + } + } + }); + // Adjust the FlowFile mime.type attribute + recordFlowFile = session.putAttribute(recordFlowFile, CoreAttributes.MIME_TYPE.key(), "application/json"); + // Update the provenance for good measure + session.getProvenanceReporter().modifyContent(recordFlowFile, "Replaced content with parsed netflowv5 fields and values"); + multipleRecords.add(recordFlowFile); + } + } + + private void generateKV(final List<FlowFile> multipleRecords, final ProcessSession session, final FlowFile flowFile, final Map<String, String> attributes, final Netflowv5Parser parser, + final int processedRecord) { + int numberOfRecords = processedRecord; + generateHeaderAttributes(attributes, parser); + + final String[] fieldname = getRecordFields(); + int record = 0; + FlowFile recordFlowFile = flowFile; + while (numberOfRecords-- > 0) { + // Process KVs of the Flow Record fields + final Object[] fieldvalue = parser.getRecordData()[record++]; + for (int i = 0; i < fieldname.length; i++) { + attributes.put("netflowv5.record." + fieldname[i], String.valueOf(fieldvalue[i])); + } + recordFlowFile = session.clone(flowFile); + recordFlowFile = session.putAllAttributes(recordFlowFile, attributes); + multipleRecords.add(recordFlowFile); + } + } + + private OptionalInt resolvePort(final FlowFile flowFile) { + final String port; + if ((port = flowFile.getAttribute("udp.port")) != null) --- End diff -- Should have curly brackets. > NetFlow Processors > ------------------ > > Key: NIFI-5327 > URL: https://issues.apache.org/jira/browse/NIFI-5327 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework > Affects Versions: 1.6.0 > Reporter: Prashanth Venkatesan > Assignee: Prashanth Venkatesan > Priority: Major > > As network traffic data scopes for the big data use case, would like NiFi to > have processors to support parsing of those protocols. > Netflow is a protocol introduced by Cisco that provides the ability to > collect IP network traffic as it enters or exits an interface and is > described in detail in here: > [https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html] > > Currently, I have created the following processor: > *ParseNetflowv5*: Parses the ingress netflowv5 bytes and ingest as either > NiFi flowfile attributes or as a JSON content. This also sends > one-time-template. > > Further ahead, we can add many processor specific to network protocols in > this nar bundle. > I will create a pull request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)