joewitt commented on code in PR #7830: URL: https://github.com/apache/nifi/pull/7830#discussion_r1343459564
########## nifi-nar-bundles/nifi-opentelemetry-bundle/nifi-opentelemetry-processors/src/main/java/org/apache/nifi/processors/opentelemetry/ListenOTLP.java: ########## @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.opentelemetry; + +import com.google.protobuf.Message; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.configuration.DefaultSchedule; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.event.transport.EventServer; +import org.apache.nifi.event.transport.EventServerFactory; +import org.apache.nifi.event.transport.netty.NettyEventServerFactory; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processors.opentelemetry.protocol.TelemetryAttributeName; +import org.apache.nifi.processors.opentelemetry.io.RequestCallback; +import org.apache.nifi.processors.opentelemetry.io.RequestCallbackProvider; +import org.apache.nifi.processors.opentelemetry.server.HttpServerFactory; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.ClientAuth; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.net.InetAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@DefaultSchedule(period = "25 ms") +@Tags({"OpenTelemetry", "OTel", "OTLP", "telemetry", "metrics", "traces", "logs"}) +@CapabilityDescription( + "Collect OpenTelemetry messages over HTTP or gRPC. " + + "Supports standard Export Service Request messages for logs, metrics, and traces. " + + "Implements OpenTelemetry OTLP Specification 1.0.0 with OTLP/gRPC and OTLP/HTTP. " + + "Provides protocol detection using the HTTP Content-Type header." +) +@WritesAttributes({ + @WritesAttribute(attribute = TelemetryAttributeName.MIME_TYPE, description = "Content-Type set to application/json"), + @WritesAttribute(attribute = TelemetryAttributeName.RESOURCE_TYPE, description = "OpenTelemetry Resource Type: LOGS, METRICS, or TRACES"), + @WritesAttribute(attribute = TelemetryAttributeName.RESOURCE_COUNT, description = "Count of resource elements included in messages"), +}) +public class ListenOTLP extends AbstractProcessor { + + static final PropertyDescriptor ADDRESS = new PropertyDescriptor.Builder() + .name("Address") + .displayName("Address") + .description("Internet Protocol Address on which to listen for OTLP Export Service Requests. The default value enables listening on all addresses.") + .required(true) + .defaultValue("0.0.0.0") + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor PORT = new PropertyDescriptor.Builder() + .name("Port") + .displayName("Port") + .description("TCP port number on which to listen for OTLP Export Service Requests over HTTP and gRPC") + .required(true) + .defaultValue("4317") + .addValidator(StandardValidators.PORT_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("SSL Context Service") + .displayName("SSL Context Service") + .description("SSL Context Service enables TLS communication for HTTPS") + .required(true) + .identifiesControllerService(SSLContextService.class) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor CLIENT_AUTHENTICATION = new PropertyDescriptor.Builder() + .name("Client Authentication") + .displayName("Client Authentication") + .description("Client authentication policy for TLS communication with HTTPS") + .required(true) + .allowableValues(ClientAuth.values()) + .defaultValue(ClientAuth.WANT.name()) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor WORKER_THREADS = new PropertyDescriptor.Builder() + .name("Worker Threads") + .displayName("Worker Threads") + .description("Number of threads responsible for decoding and queuing incoming OTLP Export Service Requests") + .required(true) + .defaultValue("2") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor QUEUE_CAPACITY = new PropertyDescriptor.Builder() + .name("Queue Capacity") + .displayName("Queue Capacity") + .description("Maximum number of OTLP request resource elements that can be received and queued") + .required(true) + .defaultValue("10000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .displayName("Batch Size") + .description("Maximum number of OTLP request resource elements included in each FlowFile produced") + .required(true) + .defaultValue("1000") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + static final Relationship SUCCESS = new Relationship.Builder() + .name("success") + .description("Export Service Requests containing OTLP Telemetry") + .build(); + + private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(SUCCESS); + + private static final List<PropertyDescriptor> DESCRIPTORS = List.of( + ADDRESS, + PORT, + SSL_CONTEXT_SERVICE, + CLIENT_AUTHENTICATION, + WORKER_THREADS, + QUEUE_CAPACITY, + BATCH_SIZE + ); + + private static final String TRANSIT_URI_FORMAT = "https://%s:%d"; + + private Iterator<RequestCallback> requestCallbackProvider; + + private EventServer server; + + @Override + public final Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + @OnScheduled + public void onScheduled(final ProcessContext context) throws UnknownHostException { + final EventServerFactory eventServerFactory = createEventServerFactory(context); + server = eventServerFactory.getEventServer(); + } + + @OnStopped + public void onStopped() { + if (server == null) { + getLogger().info("Server not running"); Review Comment: This line seems a little strange. It seems the only time the server could be null is if this processor was stopped so if that happens this method is already done/called. And even if this could happen the output doesn't seem useful to the user as there is no meaning they can really derive or action they can take. Was this from earlier testing and left in or am I not considering some case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org