[ https://issues.apache.org/jira/browse/NIFI-3238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889381#comment-15889381 ]
ASF GitHub Bot commented on NIFI-3238: -------------------------------------- Github user trixpan commented on a diff in the pull request: https://github.com/apache/nifi/pull/1418#discussion_r103599675 --- Diff: nifi-nar-bundles/nifi-beats-bundle/nifi-beats-processors/src/main/java/org/apache/nifi/processors/beats/ListenBeats.java --- @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.beats; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor; +import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher; +import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher; +import org.apache.nifi.processor.util.listen.event.EventFactory; +import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; +import org.apache.nifi.processor.util.listen.response.ChannelResponder; +import org.apache.nifi.processor.util.listen.response.ChannelResponse; +import org.apache.nifi.processors.beats.event.BeatsEvent; +import org.apache.nifi.processors.beats.event.BeatsEventFactory; +import org.apache.nifi.processors.beats.frame.BeatsEncoder; +import org.apache.nifi.processors.beats.handler.BeatsSocketChannelHandlerFactory; +import org.apache.nifi.processors.beats.response.BeatsChannelResponse; +import org.apache.nifi.processors.beats.response.BeatsResponse; +import org.apache.nifi.ssl.SSLContextService; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"listen", "beats", "tcp", "logs"}) +@CapabilityDescription("Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) being sent to a given port over TCP, writing its contents in" + + "JSON format to the content of the message to a FlowFile." + + "This processor replaces the now deprecated ListenLumberjack") +@WritesAttributes({ + @WritesAttribute(attribute = "beats.sender", description = "The sending host of the messages."), + @WritesAttribute(attribute = "beats.port", description = "The sending port the messages were received over."), + @WritesAttribute(attribute = "beats.sequencenumber", description = "The sequence number of the message. Only included if <Batch Size> is 1."), + @WritesAttribute(attribute = "mime.type", description = "The mime.type of the content which is application/json") +}) +@SeeAlso(classNames = {"org.apache.nifi.processors.standard.ParseSyslog"}) +public class ListenBeats extends AbstractListenEventBatchingProcessor<BeatsEvent> { + + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .description("The Controller Service to use in order to obtain an SSL Context. If this property is set, " + + "messages will be received over a secure connection.") + // Nearly all Lumberjack v1 implementations require TLS to work. v2 implementations (i.e. beats) have TLS as optional + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + + @Override + protected List<PropertyDescriptor> getAdditionalProperties() { + return Arrays.asList( + MAX_CONNECTIONS, + SSL_CONTEXT_SERVICE + ); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final List<ValidationResult> results = new ArrayList<>(); + + final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + + if (sslContextService != null && sslContextService.isTrustStoreConfigured() == false) { + results.add(new ValidationResult.Builder() + .explanation("The context service must have a truststore configured for the beats forwarder client to work correctly") + .valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build()); + } + + return results; + } + + private volatile BeatsEncoder beatsEncoder; + + + @Override + @OnScheduled + public void onScheduled(ProcessContext context) throws IOException { + super.onScheduled(context); + // wanted to ensure charset was already populated here + beatsEncoder = new BeatsEncoder(); + } + + @Override + protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<BeatsEvent> events) throws IOException { + final EventFactory<BeatsEvent> eventFactory = new BeatsEventFactory(); + final ChannelHandlerFactory<BeatsEvent, AsyncChannelDispatcher> handlerFactory = new BeatsSocketChannelHandlerFactory<>(); + + final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger(); + final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue()); + + // initialize the buffer pool based on max number of connections and the buffer size + final BlockingQueue<ByteBuffer> bufferPool = createBufferPool(maxConnections, bufferSize); + + // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher + SSLContext sslContext = null; + final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + if (sslContextService != null) { + sslContext = sslContextService.createSSLContext(SSLContextService.ClientAuth.REQUIRED); + } + + // if we decide to support SSL then get the context and pass it in here + return new SocketChannelDispatcher<>(eventFactory, handlerFactory, bufferPool, events, + getLogger(), maxConnections, sslContext, charSet); + } + + + @Override + protected String getBatchKey(BeatsEvent event) { + return event.getSender(); + } + + protected void respond(final BeatsEvent event, final BeatsResponse beatsResponse) { + final ChannelResponse response = new BeatsChannelResponse(beatsEncoder, beatsResponse); + + final ChannelResponder responder = event.getResponder(); + responder.addResponse(response); + try { + responder.respond(); + } catch (IOException e) { + getLogger().error("Error sending response for transaction {} due to {}", + new Object[]{event.getSeqNumber(), e.getMessage()}, e); + } + } + + protected void postProcess(final ProcessContext context, final ProcessSession session, final List<BeatsEvent> events) { + // first commit the session so we guarantee we have all the events successfully + // written to FlowFiles and transferred to the success relationship + session.commit(); + // respond to each event to acknowledge successful receipt + for (final BeatsEvent event : events) { + respond(event, BeatsResponse.ok(event.getSeqNumber())); + } + } + + @Override + protected String getTransitUri(FlowFileEventBatch batch) { + final String sender = batch.getEvents().get(0).getSender(); + final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender; + final String transitUri = new StringBuilder().append("beats").append("://").append(senderHost).append(":") + .append(port).toString(); + return transitUri; + } + + @Override + protected Map<String, String> getAttributes(FlowFileEventBatch batch) { + final List<BeatsEvent> events = batch.getEvents(); + // the sender and command will be the same for all events based on the batch key + final String sender = events.get(0).getSender(); + final int numAttributes = events.size() == 1 ? 5 : 4; + final Map<String, String> attributes = new HashMap<>(numAttributes); + attributes.put(beatsAttributes.SENDER.key(), sender); + attributes.put(beatsAttributes.PORT.key(), String.valueOf(port)); + attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json"); + // if there was only one event then we can pass on the transaction + // NOTE: we could pass on all the transaction ids joined together --- End diff -- fair enough. > ListenLumberjack should support the *beat protocol > -------------------------------------------------- > > Key: NIFI-3238 > URL: https://issues.apache.org/jira/browse/NIFI-3238 > Project: Apache NiFi > Issue Type: Improvement > Reporter: Andre F de Miranda > Assignee: Andre F de Miranda > > ListenLumberjack currently only supports v1 of the Lumberjack protocol. This > version has been deprecated in favor of v2, which is used on *beat (e.g. > filebeat, packetbeat, etc) edge components of the ELK stack. > We should consider deprecating ListenLumberjack or to extend it to handle > both v1 and v2 of the protocol. -- This message was sent by Atlassian JIRA (v6.3.15#6346)