This is an automated email from the ASF dual-hosted git repository. pvillard pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 4a27b23 NIFI-8263: Maximum Thread Pool Size property introduced in ListenHTTP 4a27b23 is described below commit 4a27b23b1f5b5abd159ee541b5d912c548dafd58 Author: Peter Gyori <peter.gyori....@gmail.com> AuthorDate: Thu Feb 25 18:31:38 2021 +0100 NIFI-8263: Maximum Thread Pool Size property introduced in ListenHTTP Signed-off-by: Pierre Villard <pierre.villard...@gmail.com> This closes #4847. --- .../nifi/processors/standard/ListenHTTP.java | 22 +++++++- .../nifi/processors/standard/TestListenHTTP.java | 64 ++++++++++++++++++++++ 2 files changed, 85 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index e918a5e..b76c70f 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@ -226,6 +226,20 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { .defaultValue(ClientAuthentication.AUTO.name()) .dependsOn(SSL_CONTEXT_SERVICE) .build(); + public static final PropertyDescriptor MAX_THREAD_POOL_SIZE = new PropertyDescriptor.Builder() + .name("max-thread-pool-size") + .displayName("Maximum Thread Pool Size") + .description("The maximum number of threads to be used by the embedded Jetty server. " + + "The value can be set between 8 and 1000. " + + "The value of this property affects the performance of the flows and the operating system, therefore " + + "the default value should only be changed in justified cases. " + + "A value that is less than the default value may be suitable " + + "if only a small number of HTTP clients connect to the server. A greater value may be suitable " + + "if a large number of HTTP clients are expected to make requests to the server simultaneously.") + .required(true) + .addValidator(StandardValidators.createLongValidator(8L, 1000L, true)) + .defaultValue("200") + .build(); public static final String CONTEXT_ATTRIBUTE_PROCESSOR = "processor"; public static final String CONTEXT_ATTRIBUTE_LOGGER = "logger"; @@ -289,6 +303,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { descriptors.add(RETURN_CODE); descriptors.add(MULTIPART_REQUEST_MAX_SIZE); descriptors.add(MULTIPART_READ_BUFFER_SIZE); + descriptors.add(MAX_THREAD_POOL_SIZE); this.properties = Collections.unmodifiableList(descriptors); } @@ -321,6 +336,10 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { shutdownHttpServer(toShutdown); } + Server getServer() { + return this.server; + } + private void shutdownHttpServer(Server toShutdown) { try { toShutdown.stop(); @@ -344,6 +363,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final int returnCode = context.getProperty(RETURN_CODE).asInteger(); long requestMaxSize = context.getProperty(MULTIPART_REQUEST_MAX_SIZE).asDataSize(DataUnit.B).longValue(); int readBufferSize = context.getProperty(MULTIPART_READ_BUFFER_SIZE).asDataSize(DataUnit.B).intValue(); + int maxThreadPoolSize = context.getProperty(MAX_THREAD_POOL_SIZE).asInteger(); throttlerRef.set(streamThrottler); final boolean sslRequired = sslContextService != null; @@ -351,7 +371,7 @@ public class ListenHTTP extends AbstractSessionFactoryProcessor { final ClientAuthentication clientAuthentication = getClientAuthentication(sslContextService, clientAuthenticationProperty); // thread pool for the jetty instance - final QueuedThreadPool threadPool = new QueuedThreadPool(); + final QueuedThreadPool threadPool = new QueuedThreadPool(maxThreadPoolSize); threadPool.setName(String.format("%s (%s) Web Server", getClass().getSimpleName(), getIdentifier())); // create the server instance diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java index 09caa5c..22cf181 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenHTTP.java @@ -67,6 +67,8 @@ import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.util.thread.ThreadPool; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -405,6 +407,68 @@ public class TestListenHTTP { assertThrows(SSLHandshakeException.class, sslSocket::startHandshake); } + @Test + public void testMaxThreadPoolSizeTooLow() { + // GIVEN, WHEN + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "7"); + + // THEN + runner.assertNotValid(); + } + + @Test + public void testMaxThreadPoolSizeTooHigh() { + // GIVEN, WHEN + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1001"); + + // THEN + runner.assertNotValid(); + } + + @Test + public void testMaxThreadPoolSizeOkLowerBound() { + // GIVEN, WHEN + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "8"); + + // THEN + runner.assertValid(); + } + + @Test + public void testMaxThreadPoolSizeOkUpperBound() { + // GIVEN, WHEN + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, "1000"); + + // THEN + runner.assertValid(); + } + + @Test + public void testMaxThreadPoolSizeSpecifiedInThePropertyIsSetInTheServerInstance() { + // GIVEN + int maxThreadPoolSize = 201; + runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); + runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH); + runner.setProperty(ListenHTTP.MAX_THREAD_POOL_SIZE, Integer.toString(maxThreadPoolSize)); + + // WHEN + startWebServer(); + + // THEN + Server server = proc.getServer(); + ThreadPool threadPool = server.getThreadPool(); + ThreadPool.SizedThreadPool sizedThreadPool = (ThreadPool.SizedThreadPool) threadPool; + assertEquals(maxThreadPoolSize, sizedThreadPool.getMaxThreads()); + } + private void startSecureServer() { runner.setProperty(ListenHTTP.PORT, Integer.toString(availablePort)); runner.setProperty(ListenHTTP.BASE_PATH, HTTP_BASE_PATH);