This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.16 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 47127330669015ef7d861b0f178128f5ae0768f7 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Tue May 24 13:25:37 2022 -0400 NIFI-10049: When unscheduling reporting task, increment its concurrent task count until we've finished all shutdown logic and then decrement it, in much the same way that we do for processors This closes #6076 Signed-off-by: David Handermann <exceptionfact...@apache.org> --- .../nifi/controller/scheduling/StandardProcessScheduler.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index a7dc11885d..546cd16fc5 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -276,6 +276,11 @@ public final class StandardProcessScheduler implements ProcessScheduler { } taskNode.verifyCanStop(); + + // Increment the Active Thread Count in order to ensure that we don't consider the Reporting Task completely stopped until we've run + // all lifecycle methods, such as @OnStopped + lifecycleState.incrementActiveThreadCount(null); + final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy()); final ReportingTask reportingTask = taskNode.getReportingTask(); taskNode.setScheduledState(ScheduledState.STOPPED); @@ -302,9 +307,13 @@ public final class StandardProcessScheduler implements ProcessScheduler { agent.unschedule(taskNode, lifecycleState); - if (lifecycleState.getActiveThreadCount() == 0 && lifecycleState.mustCallOnStoppedMethods()) { + // If active thread count == 1, that indicates that all execution threads have completed. We use 1 here instead of 0 because + // when the Reporting Task is unscheduled, we immediately increment the thread count to 1 as an indicator that we've not completely finished. + if (lifecycleState.getActiveThreadCount() == 1 && lifecycleState.mustCallOnStoppedMethods()) { ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext); } + + lifecycleState.decrementActiveThreadCount(); } } };