[ 
https://issues.apache.org/jira/browse/NIFI-1420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15167241#comment-15167241
 ] 

ASF GitHub Bot commented on NIFI-1420:
--------------------------------------

Github user olegz commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/233#discussion_r54099600
  
    --- Diff: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/ListenSplunkForwarder.java
 ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.splunk;
    +
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.behavior.WritesAttribute;
    +import org.apache.nifi.annotation.behavior.WritesAttributes;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.components.AllowableValue;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.util.listen.AbstractListenEventProcessor;
    +import 
org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
    +import 
org.apache.nifi.processor.util.listen.dispatcher.DatagramChannelDispatcher;
    +import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.event.EventFactory;
    +import org.apache.nifi.processor.util.listen.event.StandardEvent;
    +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
    +import 
org.apache.nifi.processor.util.listen.handler.socket.SocketChannelHandlerFactory;
    +import org.apache.nifi.processor.util.listen.response.ChannelResponder;
    +import org.apache.nifi.ssl.SSLContextService;
    +
    +import javax.net.ssl.SSLContext;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.nio.channels.SelectableChannel;
    +import java.nio.channels.SocketChannel;
    +import java.nio.charset.Charset;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +
    +@SupportsBatching
    +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
    +@Tags({"listen", "splunk", "tcp", "udp", "logs"})
    +@CapabilityDescription("Listens for data from a Splunk forwarder.")
    +@WritesAttributes({
    +        @WritesAttribute(attribute="splunk.sender", description="The 
sending host of the messages."),
    +        @WritesAttribute(attribute="splunk.port", description="The sending 
port the messages were received over."),
    +        @WritesAttribute(attribute="mime.type", description="The mime.type 
of the messages which is text/plain.")
    +})
    +public class ListenSplunkForwarder extends 
AbstractListenEventProcessor<ListenSplunkForwarder.SplunkEvent> {
    +
    +    public static final AllowableValue TCP_VALUE = new 
AllowableValue("TCP", "TCP");
    +    public static final AllowableValue UDP_VALUE = new 
AllowableValue("UDP", "UDP");
    +
    +    public static final PropertyDescriptor PROTOCOL = new 
PropertyDescriptor
    +            .Builder().name("Protocol")
    +            .description("The protocol for communication.")
    +            .required(true)
    +            .allowableValues(TCP_VALUE, UDP_VALUE)
    +            .defaultValue(TCP_VALUE.getValue())
    +            .build();
    +
    +    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new 
PropertyDescriptor.Builder()
    +            .name("SSL Context Service")
    +            .description("The Controller Service to use in order to obtain 
an SSL Context. If this property is set, " +
    +                    "messages will be received over a secure connection.")
    +            .required(false)
    +            .identifiesControllerService(SSLContextService.class)
    +            .build();
    +
    +    // it is only the array reference that is volatile - not the contents.
    +    private volatile byte[] messageDemarcatorBytes;
    +
    +    @Override
    +    protected List<PropertyDescriptor> getAdditionalProperties() {
    +        return Arrays.asList(
    +                PROTOCOL,
    +                MAX_CONNECTIONS,
    +                MAX_BATCH_SIZE,
    +                MESSAGE_DELIMITER,
    +                SSL_CONTEXT_SERVICE
    +        );
    +    }
    +
    +    @Override
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) throws IOException {
    +        super.onScheduled(context);
    +        final String msgDemarcator = 
context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", 
"\n").replace("\\r", "\r").replace("\\t", "\t");
    +        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
    +    }
    +
    +    @Override
    +    protected ChannelDispatcher createDispatcher(final ProcessContext 
context, final BlockingQueue<SplunkEvent> events)
    +            throws IOException {
    +
    +        final String protocol = context.getProperty(PROTOCOL).getValue();
    +        final int maxConnections = 
context.getProperty(MAX_CONNECTIONS).asInteger();
    +        final int bufferSize = 
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
    +        final Charset charSet = 
Charset.forName(context.getProperty(CHARSET).getValue());
    +
    +        // initialize the buffer pool based on max number of connections 
and the buffer size
    +        final LinkedBlockingQueue<ByteBuffer> bufferPool = new 
LinkedBlockingQueue<>(maxConnections);
    +        for (int i = 0; i < maxConnections; i++) {
    +            bufferPool.offer(ByteBuffer.allocate(bufferSize));
    +        }
    +
    +        final EventFactory<SplunkEvent> eventFactory = new 
SplunkEventFactory();
    +
    +        if (UDP_VALUE.getValue().equals(protocol)) {
    +            return new DatagramChannelDispatcher(eventFactory, bufferPool, 
events, getLogger());
    +        } else {
    +            // if an SSLContextService was provided then create an 
SSLContext to pass down to the dispatcher
    +            SSLContext sslContext = null;
    +            final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
    +            if (sslContextService != null) {
    +                sslContext = 
sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED);
    +            }
    +
    +            final ChannelHandlerFactory<SplunkEvent<SocketChannel>, 
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
    +            return new SocketChannelDispatcher(eventFactory, 
handlerFactory, bufferPool, events, getLogger(), maxConnections, sslContext, 
charSet);
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
    +        final int maxBatchSize = 
context.getProperty(MAX_BATCH_SIZE).asInteger();
    --- End diff --
    
    Considering that this is using the similar batching approach used in 
PutKafka, curious if you tested it with run-duration > 0. See comments in 
https://issues.apache.org/jira/browse/NIFI-1534 as it may be suffering from the 
same issue.


> Splunk Processors
> -----------------
>
>                 Key: NIFI-1420
>                 URL: https://issues.apache.org/jira/browse/NIFI-1420
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Bryan Bende
>            Assignee: Bryan Bende
>            Priority: Minor
>             Fix For: 0.6.0
>
>
> To continue improving NiFi's ability to collect logs, a good integration 
> point would be to have a processor that could listen for data from a Splunk 
> forwarder (https://docs.splunk.com/Splexicon:Universalforwarder). Being able 
> to push log messages to Splunk would also be useful.
> Splunk provides an SDK that may be helpful:
> https://github.com/splunk/splunk-sdk-java 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to