[ 
https://issues.apache.org/jira/browse/NIFI-5327?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599489#comment-16599489
 ] 

ASF GitHub Bot commented on NIFI-5327:
--------------------------------------

Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2820#discussion_r214501922
  
    --- Diff: 
nifi-nar-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/ParseNetflowv5.java
 ---
    @@ -0,0 +1,258 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + * http://www.apache.org/licenses/LICENSE-2.0
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.network;
    +
    +import static 
org.apache.nifi.processors.network.parser.Netflowv5Parser.getHeaderFields;
    +import static 
org.apache.nifi.processors.network.parser.Netflowv5Parser.getRecordFields;
    +
    +import java.io.BufferedOutputStream;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.OptionalInt;
    +import java.util.Set;
    +
    +import org.apache.nifi.annotation.behavior.EventDriven;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.ReadsAttribute;
    +import org.apache.nifi.annotation.behavior.ReadsAttributes;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processors.network.parser.Netflowv5Parser;
    +import org.apache.nifi.stream.io.StreamUtils;
    +
    +import com.fasterxml.jackson.core.JsonProcessingException;
    +import com.fasterxml.jackson.databind.ObjectMapper;
    +import com.fasterxml.jackson.databind.node.ObjectNode;
    +
    +@EventDriven
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({ "network", "netflow", "attributes", "datagram", "v5", "packet", 
"byte" })
    +@CapabilityDescription("Parses netflowv5 byte ingest and add to NiFi 
flowfile as attributes or JSON content.")
    +@ReadsAttributes({ @ReadsAttribute(attribute = "udp.port", description = 
"Optionally read if packets are received from UDP datagrams.") })
    +@WritesAttributes({ @WritesAttribute(attribute = "netflowv5.header.*", 
description = "The key and value generated by the parsing of the header 
fields."),
    +        @WritesAttribute(attribute = "netflowv5.record.*", description = 
"The key and value generated by the parsing of the record fields.") })
    +
    +public class ParseNetflowv5 extends AbstractProcessor {
    +    private String destination;
    +    // Add mapper
    +    private static final ObjectMapper mapper = new ObjectMapper();
    +
    +    public static final String DESTINATION_CONTENT = "flowfile-content";
    +    public static final String DESTINATION_ATTRIBUTES = 
"flowfile-attribute";
    +    public static final PropertyDescriptor FIELDS_DESTINATION = new 
PropertyDescriptor.Builder().name("FIELDS_DESTINATION").displayName("Parsed 
fields destination")
    +            .description("Indicates whether the results of the parser are 
written " + "to the FlowFile content or a FlowFile attribute; if using " + 
DESTINATION_ATTRIBUTES
    +                    + ", fields will be populated as attributes. If set to 
" + DESTINATION_CONTENT + ", the netflowv5 field will be converted into a flat 
JSON object.")
    +            .required(true).allowableValues(DESTINATION_CONTENT, 
DESTINATION_ATTRIBUTES).defaultValue(DESTINATION_CONTENT).build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder().name("failure")
    +            .description("Any FlowFile that could not be parsed as a 
netflowv5 message will be transferred to this Relationship without any 
attributes being added").build();
    +    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder().name("original").description("The original raw 
content").build();
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder().name("success")
    +            .description("Any FlowFile that is successfully parsed as a 
netflowv5 data will be transferred to this Relationship.").build();
    +
    +    public static final List<PropertyDescriptor> PROPERTIES = 
Collections.unmodifiableList(Arrays.asList(FIELDS_DESTINATION));
    +    public static final Set<Relationship> RELATIONSHIPS = 
Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_FAILURE, 
REL_ORIGINAL, REL_SUCCESS)));
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) {
    +        destination = context.getProperty(FIELDS_DESTINATION).getValue();
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if (flowFile == null) {
    +            return;
    +        }
    +
    +        final OptionalInt portNumber = resolvePort(flowFile);
    +        final Netflowv5Parser parser = new Netflowv5Parser(portNumber);
    +
    +        final byte[] buffer = new byte[(int) flowFile.getSize()];
    +        session.read(flowFile, new InputStreamCallback() {
    +
    +            @Override
    +            public void process(final InputStream in) throws IOException {
    +                StreamUtils.fillBuffer(in, buffer);
    +            }
    +        });
    +
    +        final int processedRecord;
    +        try {
    +            processedRecord = parser.parse(buffer);
    +            getLogger().debug("Parsed {} records from the packet", new 
Object[] { processedRecord });
    +        } catch (Throwable e) {
    +            getLogger().error("Parser returned unexpected Exception {} 
while processing {}; routing to failure", new Object[] { e, flowFile });
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        try {
    +            final List<FlowFile> multipleRecords = new ArrayList<>();
    +            switch (destination) {
    +            case DESTINATION_ATTRIBUTES:
    +                final Map<String, String> attributes = new HashMap<>();
    +                generateKV(multipleRecords, session, flowFile, attributes, 
parser, processedRecord);
    +                break;
    +            case DESTINATION_CONTENT:
    +                generateJSON(multipleRecords, session, flowFile, parser, 
processedRecord, buffer);
    +                break;
    +            }
    +            // Create a provenance event recording the routing to success
    +            multipleRecords.forEach(recordFlowFile -> 
session.getProvenanceReporter().route(recordFlowFile, REL_SUCCESS));
    +            session.getProvenanceReporter().route(flowFile, REL_ORIGINAL);
    +            // Ready to transfer and commit
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(multipleRecords, REL_SUCCESS);
    +            session.adjustCounter("Records Processed", processedRecord, 
false);
    +            session.commit();
    +        } catch (Exception e) {
    +            // The flowfile has failed parsing & validation, routing to 
failure
    +            getLogger().error("Failed to parse {} as a netflowv5 message 
due to {}; routing to failure", new Object[] { flowFile, e });
    +            // Create a provenance event recording the routing to failure
    +            session.getProvenanceReporter().route(flowFile, REL_FAILURE);
    +            session.transfer(flowFile, REL_FAILURE);
    +            session.commit();
    +            return;
    +        } finally {
    +            session.rollback();
    +        }
    +    }
    +
    +    private void generateJSON(final List<FlowFile> multipleRecords, final 
ProcessSession session, final FlowFile flowFile, final Netflowv5Parser parser, 
final int processedRecord, final byte[] buffer)
    +            throws JsonProcessingException {
    +        int numberOfRecords = processedRecord;
    +        FlowFile recordFlowFile = flowFile;
    +        int record = 0;
    +        while (numberOfRecords-- > 0) {
    +            ObjectNode results = mapper.createObjectNode();
    +            // Add Port number and message format
    +            results.set("port", 
mapper.valueToTree(parser.getPortNumber()));
    +            results.set("format", mapper.valueToTree("netflowv5"));
    +
    +            recordFlowFile = session.clone(flowFile);
    +            // Add JSON Objects
    +            generateJSONUtil(results, parser, record++);
    +
    +            recordFlowFile = session.write(recordFlowFile, new 
OutputStreamCallback() {
    +                @Override
    +                public void process(OutputStream out) throws IOException {
    +                    try (OutputStream outputStream = new 
BufferedOutputStream(out)) {
    +                        
outputStream.write(mapper.writeValueAsBytes(results));
    +                    }
    +                }
    +            });
    +            // Adjust the FlowFile mime.type attribute
    +            recordFlowFile = session.putAttribute(recordFlowFile, 
CoreAttributes.MIME_TYPE.key(), "application/json");
    +            // Update the provenance for good measure
    +            session.getProvenanceReporter().modifyContent(recordFlowFile, 
"Replaced content with parsed netflowv5 fields and values");
    --- End diff --
    
    If you go with `create` you can get rid of this. Something to keep in mind 
here is that depending on the volume and velocity of the data, you could easily 
overpower the older provenance repository. The write-ahead one could probably 
handle it, but you might cause problems for people who didn't migrate (and I'm 
not sure if the write-ahead one is default now)


> NetFlow Processors
> ------------------
>
>                 Key: NIFI-5327
>                 URL: https://issues.apache.org/jira/browse/NIFI-5327
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Core Framework
>    Affects Versions: 1.6.0
>            Reporter: Prashanth Venkatesan
>            Assignee: Prashanth Venkatesan
>            Priority: Major
>
> As network traffic data scopes for the big data use case, would like NiFi to 
> have processors to support parsing of those protocols.
> Netflow is a protocol introduced by Cisco that provides the ability to 
> collect IP network traffic as it enters or exits an interface and is 
> described in detail in here:
> [https://www.cisco.com/c/en/us/td/docs/net_mgmt/netflow_collection_engine/3-6/user/guide/format.html]
>  
> Currently, I have created the following processor:
> *ParseNetflowv5*:  Parses the ingress netflowv5 bytes and ingest as either 
> NiFi flowfile attributes or as a JSON content. This also sends 
> one-time-template.
>  
> Further ahead, we can add many processor specific to network protocols in 
> this nar bundle.
> I will create a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to