[ https://issues.apache.org/jira/browse/NIFI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16618441#comment-16618441 ]
ASF GitHub Bot commented on NIFI-5327: -------------------------------------- Github user PrashanthVenkatesan commented on a diff in the pull request: https://github.com/apache/nifi/pull/2820#discussion_r218293320 --- Diff: nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network; + +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields; +import static org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processors.network.parser.Netflowv5Parser; +import org.apache.nifi.stream.io.StreamUtils; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", "byte" }) +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi flowfile as attributes or JSON content.") +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = "Optionally read if packets are received from UDP datagrams.") }) +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", description = "The key and value generated by the parsing of the header fields."), + @WritesAttribute(attribute = "netflowv5.record.*", description = "The key and value generated by the parsing of the record fields.") }) + +public class ParseNetflowv5 extends AbstractProcessor { + private String destination; + // Add mapper + private static final ObjectMapper mapper = new ObjectMapper(); + + public static final String DESTINATION_CONTENT = "flowfile-content"; + public static final String DESTINATION_ATTRIBUTES = "flowfile-attribute"; + public static final PropertyDescriptor FIELDS_DESTINATION = new PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed fields destination") + .description("Indicates whether the results of the parser are written " + "to the FlowFile content or a FlowFile attribute; if using " + DESTINATION_ATTRIBUTES + + ", fields will be populated as attributes. If set to " + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat JSON object.") + .required(true).allowableValues(DESTINATION_CONTENT, DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("Any FlowFile that could not be parsed as a netflowv5 message will be transferred to this Relationship without any attributes being added").build(); + public static final Relationship REL_ORIGINAL = new Relationship.Builder().name("original").description("The original raw content").build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") + .description("Any FlowFile that is successfully parsed as a netflowv5 data will be transferred to this Relationship.").build(); + + public static final List<PropertyDescriptor> PROPERTIES = Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION)); + public static final Set<Relationship> RELATIONSHIPS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, REL_ORIGINAL, REL_SUCCESS))); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) { + destination = context.getProperty(FIELDS_DESTINATION).getValue(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final OptionalInt portNumber = resolvePort(flowFile); + final Netflowv5Parser parser = new Netflowv5Parser(portNumber); + + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + final int processedRecord; + try { + processedRecord = parser.parse(buffer); + getLogger().debug("Parsed {} records from the packet", new Object[] { processedRecord }); + } catch (Throwable e) { + getLogger().error("Parser returned unexpected Exception {} while processing {}; routing to failure", new Object[] { e, flowFile }); + session.transfer(flowFile, REL_FAILURE); + return; + } + + try { + final List<FlowFile> multipleRecords = new ArrayList<>(); + switch (destination) { + case DESTINATION_ATTRIBUTES: + final Map<String, String> attributes = new HashMap<>(); + generateKV(multipleRecords, session, flowFile, attributes, parser, processedRecord); + break; + case DESTINATION_CONTENT: + generateJSON(multipleRecords, session, flowFile, parser, processedRecord, buffer); + break; + } + // Create a provenance event recording the routing to success + multipleRecords.forEach(recordFlowFile -> session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS)); + session.getProvenanceReporter().route(flowFile, REL_ORIGINAL); + // Ready to transfer and commit + session.transfer(flowFile, REL_ORIGINAL); + session.transfer(multipleRecords, REL_SUCCESS); + session.adjustCounter("Records Processed", processedRecord, false); + session.commit(); --- End diff -- Can you please elaborate this comment? What AbstractProcessor will do? > NetFlow Processors > ------------------ > > Key: NIFI-5327 > URL: https://issues.apache.org/jira/browse/NIFI-5327 > Project: Apache NiFi > Issue Type: New Feature > Components: Core Framework > Affects Versions: 1.6.0 > Reporter: Prashanth Venkatesan > Assignee: Prashanth Venkatesan > Priority: Major > > As network traffic data scopes for the big data use case, would like NiFi to > have processors to support parsing of those protocols. > Netflow is a protocol introduced by Cisco that provides the ability to > collect IP network traffic as it enters or exits an interface and is > described in detail in here: > [https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html] > > Currently, I have created the following processor: > *ParseNetflowv5*: Parses the ingress netflowv5 bytes and ingest as either > NiFi flowfile attributes or as a JSON content. This also sends > one-time-template. > > Further ahead, we can add many processor specific to network protocols in > this nar bundle. > I will create a pull request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)