This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 1d9ccb3857 NIFI-12161: This closes #7829. Ensuring framework threads use lightweight threads instead of a capped thread pool. This prevents framework threads from livelocking in the event enough framework threads are holding threads while those needing to run cannot get them. 1d9ccb3857 is described below commit 1d9ccb3857901faaf531645ae1b09547a0944050 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Mon Oct 2 11:12:10 2023 -0400 NIFI-12161: This closes #7829. Ensuring framework threads use lightweight threads instead of a capped thread pool. This prevents framework threads from livelocking in the event enough framework threads are holding threads while those needing to run cannot get them. Signed-off-by: Joseph Witt <joew...@apache.org> --- .../scheduling/StandardProcessScheduler.java | 75 ++++++++++++++++------ .../apache/nifi/tests/system/NiFiClientUtil.java | 56 +++++++++++++--- .../org/apache/nifi/tests/system/NiFiSystemIT.java | 22 +++++-- .../SpawnedStandaloneNiFiInstanceFactory.java | 7 ++ .../system/pg/SingleFlowFileConcurrencyIT.java | 16 ++++- 5 files changed, 142 insertions(+), 34 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index ef68c3d67b..f0175d1b7c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -73,6 +73,7 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import static java.util.Objects.requireNonNull; @@ -90,11 +91,12 @@ public final class StandardProcessScheduler implements ProcessScheduler { private final StateManagerProvider stateManagerProvider; private final long processorStartTimeoutMillis; private final LifecycleStateManager lifecycleStateManager; + private final AtomicLong frameworkTaskThreadIndex = new AtomicLong(1L); - private final ScheduledExecutorService frameworkTaskExecutor; private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>(); // thread pool for starting/stopping components + private volatile boolean shutdown = false; private final ScheduledExecutorService componentLifeCycleThreadPool; private final ScheduledExecutorService componentMonitoringThreadPool = new FlowEngine(2, "Monitor Processor Lifecycle", true); @@ -111,8 +113,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { final String timeoutString = nifiProperties.getProperty(NiFiProperties.PROCESSOR_SCHEDULING_TIMEOUT); processorStartTimeoutMillis = timeoutString == null ? 60000 : FormatUtils.getTimeDuration(timeoutString.trim(), TimeUnit.MILLISECONDS); - - frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread"); } public ControllerServiceProvider getControllerServiceProvider() { @@ -123,20 +123,36 @@ public final class StandardProcessScheduler implements ProcessScheduler { return stateManagerProvider.getStateManager(componentId); } - public void scheduleFrameworkTask(final Runnable command, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) { - frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - try { - command.run(); - } catch (final Throwable t) { - LOG.error("Failed to run Framework Task {} due to {}", taskName, t.toString()); - if (LOG.isDebugEnabled()) { - LOG.error("", t); - } - } + public void scheduleFrameworkTask(final Runnable task, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) { + Thread.ofVirtual() + .name(taskName) + .start(() -> invokeRepeatedly(task, taskName, initialDelay, delay, timeUnit)); + } + + private void invokeRepeatedly(final Runnable task, final String taskName, final long initialDelay, final long delayBetweenInvocations, final TimeUnit timeUnit) { + if (initialDelay > 0) { + try { + timeUnit.sleep(initialDelay); + } catch (final InterruptedException ie) { + Thread.currentThread().interrupt(); + return; + } + } + + while (!this.shutdown) { + try { + task.run(); + } catch (final Exception e) { + LOG.error("Failed to run Framework Task {}", taskName, e); + } + + try { + timeUnit.sleep(delayBetweenInvocations); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return; } - }, initialDelay, delay, timeUnit); + } } /** @@ -145,7 +161,29 @@ public final class StandardProcessScheduler implements ProcessScheduler { * @param task the task to perform */ public Future<?> submitFrameworkTask(final Runnable task) { - return frameworkTaskExecutor.submit(task); + final CompletableFuture<?> future = new CompletableFuture<>(); + + Thread.ofVirtual() + .name("Framework Task Thread-" + frameworkTaskThreadIndex.getAndIncrement()) + .start(wrapTask(task, future)); + + return future; + } + + private Runnable wrapTask(final Runnable task, final CompletableFuture<?> future) { + return () -> { + try { + task.run(); + future.complete(null); + } catch (final Exception e) { + LOG.error("Encountered unexpected Exception when performing background Framework Task", e); + future.completeExceptionally(e); + } catch (final Throwable t) { + LOG.error("Encountered unexpected Exception when performing background Framework Task", t); + future.completeExceptionally(t); + throw t; + } + }; } @Override @@ -172,6 +210,8 @@ public final class StandardProcessScheduler implements ProcessScheduler { @Override public void shutdown() { + shutdown = true; + for (final SchedulingAgent schedulingAgent : strategyAgentMap.values()) { try { schedulingAgent.shutdown(); @@ -181,7 +221,6 @@ public final class StandardProcessScheduler implements ProcessScheduler { } } - frameworkTaskExecutor.shutdown(); componentLifeCycleThreadPool.shutdown(); } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java index 397f98c595..b55bd80967 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java @@ -126,6 +126,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; public class NiFiClientUtil { @@ -696,7 +697,10 @@ public class NiFiClientUtil { } public void waitForProcessorState(final String processorId, final String expectedState) throws NiFiClientException, IOException, InterruptedException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); + logger.info("Waiting for Processor {} to reach state {}", processorId, expectedState); + + while (System.currentTimeMillis() < maxTimestamp) { final ProcessorEntity entity = getProcessorClient().getProcessor(processorId); final String state = entity.getComponent().getState(); @@ -714,6 +718,7 @@ public class NiFiClientUtil { final ProcessorStatusSnapshotDTO snapshotDto = entity.getStatus().getAggregateSnapshot(); if (snapshotDto.getActiveThreadCount() == 0 && snapshotDto.getTerminatedThreadCount() == 0) { + logger.info("Processor {} has reached desired state of {}", processorId, expectedState); return; } @@ -722,7 +727,10 @@ public class NiFiClientUtil { } public ReportingTaskEntity waitForReportingTaskState(final String reportingTaskId, final String expectedState) throws NiFiClientException, IOException, InterruptedException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + logger.info("Waiting for Reporting Task {} to reach desired state of {}", reportingTaskId, expectedState); + + while (System.currentTimeMillis() < maxTimestamp) { final ReportingTaskEntity entity = nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId); final String state = entity.getComponent().getState(); @@ -735,15 +743,19 @@ public class NiFiClientUtil { } if ("RUNNING".equals(expectedState)) { + logger.info("Reporting task {} is now running", reportingTaskId); return entity; } if (entity.getStatus().getActiveThreadCount() == 0) { + logger.info("Reporting task {} is now stopped", reportingTaskId); return entity; } Thread.sleep(10L); } + + throw new IOException("Timed out waiting for Reporting Task " + reportingTaskId + " to reach state of " + expectedState); } public void waitForReportingTaskValid(final String reportingTaskId) throws NiFiClientException, IOException { @@ -868,9 +880,13 @@ public class NiFiClientUtil { } private void waitForNoRunningComponents(final String groupId) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2); + logger.info("Waiting for no more running components for group {}", groupId); + + while (System.currentTimeMillis() < maxTimestamp) { final boolean anyRunning = isAnyComponentRunning(groupId); if (!anyRunning) { + logger.info("All Process Groups have finished"); return; } @@ -906,6 +922,8 @@ public class NiFiClientUtil { } private void waitForProcessorsStopped(final String groupId) throws IOException, NiFiClientException { + logger.info("Waiting for processors in group {} to stop", groupId); + final ProcessGroupFlowEntity rootGroup = nifiClient.getFlowClient().getProcessGroup(groupId); final FlowDTO rootFlowDTO = rootGroup.getProcessGroupFlow().getFlow(); for (final ProcessorEntity processor : rootFlowDTO.getProcessors()) { @@ -920,6 +938,8 @@ public class NiFiClientUtil { for (final ProcessGroupEntity group : rootFlowDTO.getProcessGroups()) { waitForProcessorsStopped(group.getComponent()); } + + logger.info("All processors in group {} have stopped", groupId); } private void waitForProcessorsStopped(final ProcessGroupDTO group) throws IOException, NiFiClientException { @@ -956,6 +976,8 @@ public class NiFiClientUtil { } public ActivateControllerServicesEntity disableControllerServices(final String groupId, final boolean recurse) throws NiFiClientException, IOException { + logger.info("Starting disableControllerServices for group {}, recurse={}", groupId, recurse); + final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity(); activateControllerServicesEntity.setId(groupId); activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_DISABLED); @@ -973,6 +995,7 @@ public class NiFiClientUtil { } } + logger.info("Finished disableControllerServices for group {}", groupId); return activateControllerServices; } @@ -998,7 +1021,10 @@ public class NiFiClientUtil { } public void waitForControllerServiceRunStatus(final String id, final String requestedRunStatus) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + logger.info("Waiting for Controller Service {} to have a Run Status of {}", id, requestedRunStatus); + + while (System.currentTimeMillis() < maxTimestamp) { final ControllerServiceEntity serviceEntity = nifiClient.getControllerServicesClient().getControllerService(id); final String serviceState = serviceEntity.getComponent().getState(); if (requestedRunStatus.equals(serviceState)) { @@ -1029,7 +1055,9 @@ public class NiFiClientUtil { } public void waitForControllerServiceState(final String groupId, final String desiredState, final Collection<String> serviceIdsOfInterest) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + + while (System.currentTimeMillis() < maxTimestamp) { final List<ControllerServiceEntity> nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest); if (nonDisabledServices.isEmpty()) { logger.info("Process Group [{}] Controller Services have desired state [{}]", groupId, desiredState); @@ -1049,7 +1077,9 @@ public class NiFiClientUtil { } public void waitForControllerServiceValidationStatus(final String controllerServiceId, final String validationStatus) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + + while (System.currentTimeMillis() < maxTimestamp) { final ControllerServiceEntity controllerServiceEntity = nifiClient.getControllerServicesClient().getControllerService(controllerServiceId); final String currentValidationStatus = controllerServiceEntity.getComponent().getValidationStatus(); if (validationStatus.equals(currentValidationStatus)) { @@ -1070,7 +1100,9 @@ public class NiFiClientUtil { } public void waitForReportingTaskValidationStatus(final String reportingTaskId, final String validationStatus) throws NiFiClientException, IOException { - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + + while (System.currentTimeMillis() < maxTimestamp) { final ReportingTaskEntity reportingTaskEntity = nifiClient.getReportingTasksClient().getReportingTask(reportingTaskId); final String currentValidationStatus = reportingTaskEntity.getStatus().getValidationStatus(); if (validationStatus.equalsIgnoreCase(currentValidationStatus)) { @@ -1306,11 +1338,17 @@ public class NiFiClientUtil { public DropRequestEntity emptyQueue(final String connectionId) throws NiFiClientException, IOException { final ConnectionClient connectionClient = getConnectionClient(); - DropRequestEntity requestEntity; - while (true) { + final long maxTimestamp = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(2L); + + DropRequestEntity requestEntity = null; + while (System.currentTimeMillis() < maxTimestamp) { requestEntity = connectionClient.emptyQueue(connectionId); try { while (requestEntity.getDropRequest().getPercentCompleted() < 100) { + if (System.currentTimeMillis() > maxTimestamp) { + throw new IOException("Timed out waiting for queue " + connectionId + " to empty"); + } + try { Thread.sleep(10L); } catch (final InterruptedException ie) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java index b927fae922..5168a152c5 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java @@ -99,7 +99,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { private static final NiFiInstanceCache instanceCache = new NiFiInstanceCache(); static { - Runtime.getRuntime().addShutdownHook(new Thread(() -> instanceCache.shutdown())); + Runtime.getRuntime().addShutdownHook(new Thread(instanceCache::shutdown)); } private TestInfo testInfo; @@ -107,6 +107,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { @BeforeEach public void setup(final TestInfo testInfo) throws IOException { this.testInfo = testInfo; + final String testClassName = testInfo.getTestClass().map(Class::getSimpleName).orElse("<Unknown Test Class>"); final String friendlyTestName = testClassName + ":" + testInfo.getDisplayName(); logger.info("Beginning Test {}", friendlyTestName); @@ -133,21 +134,24 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { return true; } - protected TestInfo getTestInfo() { - return testInfo; - } @AfterAll public static void cleanup() { + logger.info("Beginning cleanup"); + final NiFiInstance nifi = nifiRef.get(); nifiRef.set(null); if (nifi != null) { instanceCache.stopOrRecycle(nifi); } + + logger.info("Finished cleanup"); } @AfterEach public void teardown() throws Exception { + logger.info("Beginning teardown"); + try { Exception destroyFlowFailure = null; @@ -182,6 +186,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { if (nifiClient != null) { nifiClient.close(); } + + logger.info("Finished teardown"); } } @@ -230,6 +236,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { } protected void destroyFlow() throws NiFiClientException, IOException, InterruptedException { + logger.info("Starting destroyFlow"); + getClientUtil().stopProcessGroupComponents("root"); getClientUtil().disableControllerServices("root", true); getClientUtil().stopReportingTasks(); @@ -238,6 +246,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { getClientUtil().deleteAll("root"); getClientUtil().deleteControllerLevelServices(); getClientUtil().deleteReportingTasks(); + + logger.info("Finished destroyFlow"); } protected void waitForAllNodesConnected() { @@ -273,7 +283,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { } if (System.currentTimeMillis() > maxTime) { - throw new RuntimeException("Waited up to 60 seconds for both nodes to connect but only " + connectedNodeCount + " nodes connected"); + throw new RuntimeException("Waited up to 60 seconds for all nodes to connect but only " + connectedNodeCount + " nodes connected"); } try { @@ -569,7 +579,7 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider { } protected void waitForCoordinatorElected() throws InterruptedException { - waitFor(() -> isCoordinatorElected()); + waitFor(this::isCoordinatorElected); } protected boolean isCoordinatorElected() throws NiFiClientException, IOException { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java index 11b1172771..bc1047f927 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/SpawnedStandaloneNiFiInstanceFactory.java @@ -37,6 +37,7 @@ import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Properties; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -234,6 +235,8 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory } private void waitForStartup() throws IOException { + final long timeoutMillis = System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(5L); + try (final NiFiClient client = createClient()) { while (true) { try { @@ -241,6 +244,10 @@ public class SpawnedStandaloneNiFiInstanceFactory implements NiFiInstanceFactory logger.info("NiFi Startup Completed [{}]", instanceDirectory.getName()); return; } catch (final Exception e) { + if (System.currentTimeMillis() > timeoutMillis) { + throw new IOException("After waiting 5 minutes, NiFi instance still has not started"); + } + try { Thread.sleep(1000L); } catch (InterruptedException ex) { diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java index 7c96e82edf..dc02c2cc7b 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java @@ -27,6 +27,8 @@ import org.apache.nifi.web.api.entity.PortEntity; import org.apache.nifi.web.api.entity.ProcessGroupEntity; import org.apache.nifi.web.api.entity.ProcessorEntity; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; @@ -37,10 +39,12 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { - + private static final Logger logger = LoggerFactory.getLogger(SingleFlowFileConcurrencyIT.class); @Test public void testSingleConcurrency() throws NiFiClientException, IOException, InterruptedException { + logger.info("Beginning test testSingleConcurrency"); + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); @@ -94,11 +98,15 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { // Ensure that 3 FlowFiles are queued up for Terminate waitForQueueCount(outputToTerminate.getId(), 3); + + logger.info("Finished test testSingleConcurrency"); } @Test public void testSingleConcurrencyAndBatchOutput() throws NiFiClientException, IOException, InterruptedException { + logger.info("Beginning test testSingleConcurrencyAndBatchOutput"); + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); @@ -163,11 +171,15 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { final Map<String, String> secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes(); assertEquals("1", secondOutAttributes.get("batch.output.Out")); assertEquals("1", secondOutAttributes.get("batch.output.Out2")); + + logger.info("Finished test testSingleConcurrencyAndBatchOutput"); } @Test public void testBatchOutputHasCorrectNumbersOnRestart() throws NiFiClientException, IOException, InterruptedException { + logger.info("Beginning test testBatchOutputHasCorrectNumbersOnRestart"); + final ProcessGroupEntity processGroupEntity = getClientUtil().createProcessGroup("My Group", "root"); final PortEntity inputPort = getClientUtil().createInputPort("In", processGroupEntity.getId()); final PortEntity outputPort = getClientUtil().createOutputPort("Out", processGroupEntity.getId()); @@ -238,6 +250,8 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT { final Map<String, String> secondOutAttributes = secondOutFlowFile.getFlowFile().getAttributes(); assertEquals("1", secondOutAttributes.get("batch.output.Out")); assertEquals("1", secondOutAttributes.get("batch.output.Out2")); + + logger.info("Finished test testBatchOutputHasCorrectNumbersOnRestart"); } }