This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new e663621 NIFI-7922: Added support for GET in ListenHTTP for health check e663621 is described below commit e66362194db07798eadb68724072f5e299c65902 Author: Peter Gyori <peter.gyori....@gmail.com> AuthorDate: Wed Oct 14 16:15:04 2020 +0200 NIFI-7922: Added support for GET in ListenHTTP for health check NIFI-7922: Fixes based on review comments ListenHTTP: changed if(!sslRequired) to if(sslRequired) so that the positive case comes first. HealthCheckServlet: response body for GET contains "OK". ContentAcknowledgmentServlet: super.doDelete() is called when a DELETE should be rejected because of port mismatch. NIFI-7922: Refactored, based on review comments NIFI-7922: Fixed a checkstyle violation (organized the imports) This closes #4603. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../nifi/processors/standard/ListenHTTP.java | 181 ++++++++++++++------- .../servlets/ContentAcknowledgmentServlet.java | 8 + .../standard/servlets/HealthCheckServlet.java | 35 ++++ .../standard/servlets/ListenHTTPServlet.java | 18 +- 4 files changed, 177 insertions(+), 65 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index cfc1a2a..7fba1d8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -16,22 +16,6 @@ */ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; -import javax.servlet.Servlet; -import javax.servlet.http.HttpServletResponse; -import javax.ws.rs.Path; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -41,6 +25,8 @@ import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange; import org.apache.nifi.annotation.notification.PrimaryNodeState; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; @@ -53,13 +39,13 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.servlets.ContentAcknowledgmentServlet; +import org.apache.nifi.processors.standard.servlets.HealthCheckServlet; import org.apache.nifi.processors.standard.servlets.ListenHTTPServlet; import org.apache.nifi.scheduling.ExecutionNode; import org.apache.nifi.ssl.RestrictedSSLContextService; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.stream.io.LeakyBucketStreamThrottler; import org.apache.nifi.stream.io.StreamThrottler; -import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.SecureRequestCustomizer; @@ -70,11 +56,32 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; +import javax.servlet.Servlet; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Path; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Pattern; + @InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"ingest", "http", "https", "rest", "listen"}) @CapabilityDescription("Starts an HTTP Server and listens on a given base path to transform incoming requests into FlowFiles. " + "The default URI of the Service will be http://{hostname}:{port}/contentListener. Only HEAD and POST requests are " - + "supported. GET, PUT, and DELETE will result in an error and the HTTP response status code 405.") + + "supported. GET, PUT, and DELETE will result in an error and the HTTP response status code 405. " + + "GET is supported on <service_URI>/healthcheck. If the service is available, it returns \"200 OK\" with the content \"OK\". " + + "The health check functionality can be configured to be accessible via a different port. " + + "For details see the documentation of the \"Listening Port for health check requests\" property.") public class ListenHTTP extends AbstractSessionFactoryProcessor { private Set<Relationship> relationships; @@ -104,6 +111,21 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); + public static final PropertyDescriptor HEALTH_CHECK_PORT = new PropertyDescriptor.Builder() + .name("health-check-port") + .displayName("Listening Port for Health Check Requests") + .description("The port to listen on for incoming health check requests. " + + "If set, it must be different from the Listening Port. " + + "Configure this port if the processor is set to use two-way SSL and a load balancer that does not support client authentication for " + + "health check requests is used. " + + "Only /<base_path>/healthcheck service is available via this port and only GET and HEAD requests are supported. " + + "If the processor is set not to use SSL, SSL will not be used on this port, either. " + + "If the processor is set to use one-way SSL, one-way SSL will be used on this port. " + + "If the processor is set to use two-way SSL, one-way SSL will be used on this port (client authentication not required).") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.PORT_VALIDATOR) + .build(); public static final PropertyDescriptor AUTHORIZED_DN_PATTERN = new PropertyDescriptor.Builder() .name("Authorized DN Pattern") .description("A Regular Expression to apply against the Distinguished Name of incoming connections. If the Pattern does not match the DN, the connection will be refused.") @@ -174,6 +196,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { public static final String CONTEXT_ATTRIBUTE_RETURN_CODE = "returnCode"; public static final String CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE = "multipartRequestMaxSize"; public static final String CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE = "multipartReadBufferSize"; + public static final String CONTEXT_ATTRIBUTE_PORT = "port"; private volatile Server server = null; private final ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap = new ConcurrentHashMap<>(); @@ -181,6 +204,30 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { private final AtomicReference<StreamThrottler> throttlerRef = new AtomicReference<>(); @Override + protected Collection<ValidationResult> customValidate(ValidationContext context) { + List<ValidationResult> results = new ArrayList<>(1); + + validatePortsAreNotEqual(context, results); + + return results; + } + + private void validatePortsAreNotEqual(ValidationContext context, Collection<ValidationResult> validationResults) { + Integer healthCheckPort = context.getProperty(HEALTH_CHECK_PORT).evaluateAttributeExpressions().asInteger(); + if (healthCheckPort != null) { + Integer port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); + if (port.equals(healthCheckPort)) { + String explanation = String.format("'%s' and '%s' cannot have the same value.", PORT.getDisplayName(), HEALTH_CHECK_PORT.getDisplayName()); + validationResults.add(createValidationResult(HEALTH_CHECK_PORT.getDisplayName(), explanation)); + } + } + } + + private ValidationResult createValidationResult(String subject, String explanation) { + return new ValidationResult.Builder().subject(subject).valid(false).explanation(explanation).build(); + } + + @Override protected void init(final ProcessorInitializationContext context) { final Set<Relationship> relationships = new HashSet<>(); relationships.add(RELATIONSHIP_SUCCESS); @@ -189,6 +236,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final List<PropertyDescriptor> descriptors = new ArrayList<>(); descriptors.add(BASE_PATH); descriptors.add(PORT); + descriptors.add(HEALTH_CHECK_PORT); descriptors.add(MAX_DATA_RATE); descriptors.add(SSL_CONTEXT_SERVICE); descriptors.add(AUTHORIZED_DN_PATTERN); @@ -254,32 +302,9 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); throttlerRef.set(streamThrottler); + final boolean sslRequired = ((sslContextService != null) && (sslContextService.getKeyStoreFile() != null)); final boolean needClientAuth = sslContextService != null && sslContextService.getTrustStoreFile() != null; - final SslContextFactory contextFactory = new SslContextFactory.Server(); - contextFactory.setNeedClientAuth(needClientAuth); - - if (needClientAuth) { - contextFactory.setTrustStorePath(sslContextService.getTrustStoreFile()); - contextFactory.setTrustStoreType(sslContextService.getTrustStoreType()); - contextFactory.setTrustStorePassword(sslContextService.getTrustStorePassword()); - } - - final String keystorePath = sslContextService == null ? null : sslContextService.getKeyStoreFile(); - if (keystorePath != null) { - final String keystorePassword = sslContextService.getKeyStorePassword(); - final String keyStoreType = sslContextService.getKeyStoreType(); - - contextFactory.setKeyStorePath(keystorePath); - contextFactory.setKeyManagerPassword(keystorePassword); - contextFactory.setKeyStorePassword(keystorePassword); - contextFactory.setKeyStoreType(keyStoreType); - } - - if (sslContextService != null) { - contextFactory.setProtocol(sslContextService.getSslAlgorithm()); - } - // thread pool for the jetty instance final QueuedThreadPool threadPool = new QueuedThreadPool(); threadPool.setName(String.format("%s (%s) Web Server", getClass().getSimpleName(), getIdentifier())); @@ -290,29 +315,17 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { // get the configured port final int port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger(); - final ServerConnector connector; - final HttpConfiguration httpConfiguration = new HttpConfiguration(); - if (keystorePath == null) { - // create the connector - connector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration)); - } else { - // configure the ssl connector - httpConfiguration.setSecureScheme("https"); - httpConfiguration.setSecurePort(port); - httpConfiguration.addCustomizer(new SecureRequestCustomizer()); + final ServerConnector connector = createServerConnector(server, port, sslContextService, sslRequired, needClientAuth); + server.addConnector(connector); - // build the connector - - connector = new ServerConnector(server, new SslConnectionFactory(contextFactory, "http/1.1"), new HttpConnectionFactory(httpConfiguration)); + // Add a separate connector for the health check port (if specified) + final Integer healthCheckPort = context.getProperty(HEALTH_CHECK_PORT).evaluateAttributeExpressions().asInteger(); + if (healthCheckPort != null) { + final ServerConnector healthCheckConnector = createServerConnector(server, healthCheckPort, sslContextService, sslRequired, false); + server.addConnector(healthCheckConnector); } - // configure the port - connector.setPort(port); - - // add the connector to the server - server.setConnectors(new Connector[] {connector}); - - final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, (keystorePath != null)); + final ServletContextHandler contextHandler = new ServletContextHandler(server, "/", true, sslRequired); for (final Class<? extends Servlet> cls : getServerClasses()) { final Path path = cls.getAnnotation(Path.class); // Note: servlets must have a path annotation - this will NPE otherwise @@ -336,6 +349,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_RETURN_CODE, returnCode); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE, requestMaxSize); contextHandler.setAttribute(CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE, readBufferSize); + contextHandler.setAttribute(CONTEXT_ATTRIBUTE_PORT, port); if (context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).isSet()) { contextHandler.setAttribute(CONTEXT_ATTRIBUTE_HEADER_PATTERN, Pattern.compile(context.getProperty(HEADERS_AS_ATTRIBUTES_REGEX).getValue())); @@ -351,6 +365,48 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { initialized.set(true); } + private ServerConnector createServerConnector(Server server, int port, SSLContextService sslContextService, boolean sslRequired, boolean needClientAuth) { + final ServerConnector connector; + final HttpConfiguration httpConfiguration = new HttpConfiguration(); + if (sslRequired) { + httpConfiguration.setSecureScheme("https"); + httpConfiguration.setSecurePort(port); + httpConfiguration.addCustomizer(new SecureRequestCustomizer()); + + final SslContextFactory contextFactory = createSslContextFactory(sslContextService, needClientAuth); + + connector = new ServerConnector(server, new SslConnectionFactory(contextFactory, "http/1.1"), new HttpConnectionFactory(httpConfiguration)); + } else { + connector = new ServerConnector(server, new HttpConnectionFactory(httpConfiguration)); + } + + connector.setPort(port); + return connector; + } + + private SslContextFactory createSslContextFactory(SSLContextService sslContextService, boolean needClientAuth) { + final SslContextFactory contextFactory = new SslContextFactory.Server(); + + final String keystorePassword = sslContextService.getKeyStorePassword(); + final String keyStoreType = sslContextService.getKeyStoreType(); + final String keyStorePath = sslContextService.getKeyStoreFile(); + + contextFactory.setKeyStorePath(keyStorePath); + contextFactory.setKeyStorePassword(keystorePassword); + contextFactory.setKeyManagerPassword(keystorePassword); + contextFactory.setKeyStoreType(keyStoreType); + contextFactory.setProtocol(sslContextService.getSslAlgorithm()); + + contextFactory.setNeedClientAuth(needClientAuth); + if (needClientAuth) { + contextFactory.setTrustStorePath(sslContextService.getTrustStoreFile()); + contextFactory.setTrustStorePassword(sslContextService.getTrustStorePassword()); + contextFactory.setTrustStoreType(sslContextService.getTrustStoreType()); + } + + return contextFactory; + } + @OnScheduled public void clearInit(){ initialized.set(false); @@ -362,6 +418,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { // any servlets other than ListenHTTPServlet must have a Path annotation start with / s.add(ListenHTTPServlet.class); s.add(ContentAcknowledgmentServlet.class); + s.add(HealthCheckServlet.class); return s; } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java index 0faeec6..a0ee8a2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ContentAcknowledgmentServlet.java @@ -48,6 +48,7 @@ public class ContentAcknowledgmentServlet extends HttpServlet { private Pattern authorizedPattern; private ComponentLog logger; private ConcurrentMap<String, FlowFileEntryTimeWrapper> flowFileMap; + private int port; @SuppressWarnings("unchecked") @Override @@ -57,10 +58,17 @@ public class ContentAcknowledgmentServlet extends HttpServlet { this.logger = (ComponentLog) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_LOGGER); this.authorizedPattern = (Pattern) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_AUTHORITY_PATTERN); this.flowFileMap = (ConcurrentMap<String, FlowFileEntryTimeWrapper>) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_FLOWFILE_MAP); + this.port = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PORT); } @Override protected void doDelete(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + + if (request.getLocalPort() != port) { + super.doDelete(request, response); + return; + } + final X509Certificate[] certs = (X509Certificate[]) request.getAttribute("javax.servlet.request.X509Certificate"); String foundSubject = DEFAULT_FOUND_SUBJECT; if (certs != null && certs.length > 0) { diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/HealthCheckServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/HealthCheckServlet.java new file mode 100644 index 0000000..4a7fc55 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/HealthCheckServlet.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard.servlets; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Path; +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +@Path("/healthcheck") +public class HealthCheckServlet extends HttpServlet { + + @Override + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + response.setStatus(HttpServletResponse.SC_OK); + response.getOutputStream().write("OK".getBytes(StandardCharsets.UTF_8)); + } +} diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java index 7c07512..9c65e66 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/servlets/ListenHTTPServlet.java @@ -104,6 +104,7 @@ public class ListenHTTPServlet extends HttpServlet { private int returnCode; private long multipartRequestMaxSize; private int multipartReadBufferSize; + private int port; @SuppressWarnings("unchecked") @Override @@ -120,17 +121,28 @@ public class ListenHTTPServlet extends HttpServlet { this.returnCode = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_RETURN_CODE); this.multipartRequestMaxSize = (long) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_REQUEST_MAX_SIZE); this.multipartReadBufferSize = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_MULTIPART_READ_BUFFER_SIZE); + this.port = (int) context.getAttribute(ListenHTTP.CONTEXT_ATTRIBUTE_PORT); } @Override protected void doHead(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { - response.addHeader(ACCEPT_ENCODING_NAME, ACCEPT_ENCODING_VALUE); - response.addHeader(ACCEPT_HEADER_NAME, ACCEPT_HEADER_VALUE); - response.addHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION); + if (request.getLocalPort() == port) { + response.addHeader(ACCEPT_ENCODING_NAME, ACCEPT_ENCODING_VALUE); + response.addHeader(ACCEPT_HEADER_NAME, ACCEPT_HEADER_VALUE); + response.addHeader(PROTOCOL_VERSION_HEADER, PROTOCOL_VERSION); + } else { + super.doHead(request, response); + } } @Override protected void doPost(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException { + + if (request.getLocalPort() != port) { + super.doPost(request, response); + return; + } + final ProcessContext context = processContext; ProcessSessionFactory sessionFactory;