[ https://issues.apache.org/jira/browse/NIFI-856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15309210#comment-15309210 ]
ASF GitHub Bot commented on NIFI-856: ------------------------------------- Github user trixpan commented on a diff in the pull request: https://github.com/apache/nifi/pull/290#discussion_r65298978 --- Diff: nifi-nar-bundles/nifi-lumberjack-bundle/nifi-lumberjack-processors/src/main/java/org/apache/nifi/processors/lumberjack/ListenLumberjack.java --- @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.lumberjack; + +import com.google.gson.Gson; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.processors.lumberjack.event.LumberjackEvent; +import org.apache.nifi.processors.lumberjack.event.LumberjackEventFactory; +import org.apache.nifi.processors.lumberjack.frame.LumberjackEncoder; +import org.apache.nifi.processors.lumberjack.handler.LumberjackSocketChannelHandlerFactory; +import org.apache.nifi.processors.lumberjack.response.LumberjackChannelResponse; +import org.apache.nifi.processors.lumberjack.response.LumberjackResponse; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Collection; +import java.util.ArrayList; +import java.util.concurrent.BlockingQueue; + + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"listen", "lumberjack", "tcp", "logs"}) +@CapabilityDescription("Listens for Lumberjack messages being sent to a given port over TCP. Each message will be " + + "acknowledged after successfully writing the message to a FlowFile. Each FlowFile will contain data " + + "portion of one or more Lumberjack frames. In the case where the Lumberjack frames contain syslog messages, the " + + "output of this processor can be sent to a ParseSyslog processor for further processing.") +@WritesAttributes({ + @WritesAttribute(attribute="lumberjack.sender", description="The sending host of the messages."), + @WritesAttribute(attribute="lumberjack.port", description="The sending port the messages were received over."), + @WritesAttribute(attribute="lumberjack.sequencenumber", description="The sequence number of the message. Only included if <Batch Size> is 1."), + @WritesAttribute(attribute="lumberjack.*", description="The keys and respective values as sent by the lumberjack producer. Only included if <Batch Size> is 1."), + @WritesAttribute(attribute="mime.type", description="The mime.type of the content which is text/plain") +}) +@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"}) +public class ListenLumberjack extends AbstractListenEventBatchingProcessor<LumberjackEvent> { + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() --- End diff -- @apiri Made it required just to provide overall consistency with clients. Will revisit when v2 support is introduced. > Add Processor for Lumberjack protocol > ------------------------------------- > > Key: NIFI-856 > URL: https://issues.apache.org/jira/browse/NIFI-856 > Project: Apache NiFi > Issue Type: New Feature > Reporter: Mike de Rhino > Labels: features > Attachments: NIFI-856.patch > > > It would be great if NIFI could support the [lumberjack > protocol|https://github.com/elastic/logstash-forwarder/blob/master/PROTOCOL.md] > so to enable the use of logstash forwarder as a source of data. > A lot of non Java shops tend to avoid installing Java at data producing nodes > and instead of Flume they end up using things like kafka, heka, fluentd or > logstash-forwarded as data shipping mechanisms. > Kafka is great but its architecture seem to be better focused on multi-DC > environments instead of multi-branch scenarios (imagine having to manager 80 > Zookeeper quorum, one for each country where you operate?) > [Heka|https://github.com/mozilla-services/heka] is fine, it has decent > backpressure buffering but no concept of acknowledgement on the receiving > side of a TCP stream. If the other end of a TCP stream is capable of > listening but gets stuck with its messages it will keep spitting data through > the pipe, oblivious to the woes at the other end. > Logstash forwarder in the other hand, is a quite simple tool, with a > reasonable implementation of acknowledgments on the receiving side but... it > depends on Logstash(and logstash has its own issues). > It would be great if NIFI could serve as a middle man, receiving lumberjack > messages and offloading some of the hard work Logstash seems to struggle with > (e.g. using NIFI to save to HDFS while a downstream Logstash writes into ES). -- This message was sent by Atlassian JIRA (v6.3.4#6332)