This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.16
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 47127330669015ef7d861b0f178128f5ae0768f7
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Tue May 24 13:25:37 2022 -0400

    NIFI-10049: When unscheduling reporting task, increment its concurrent task 
count until we've finished all shutdown logic and then decrement it, in much 
the same way that we do for processors
    
    This closes #6076
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
---
 .../nifi/controller/scheduling/StandardProcessScheduler.java  | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index a7dc11885d..546cd16fc5 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -276,6 +276,11 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
         }
 
         taskNode.verifyCanStop();
+
+        // Increment the Active Thread Count in order to ensure that we don't 
consider the Reporting Task completely stopped until we've run
+        // all lifecycle methods, such as @OnStopped
+        lifecycleState.incrementActiveThreadCount(null);
+
         final SchedulingAgent agent = 
getSchedulingAgent(taskNode.getSchedulingStrategy());
         final ReportingTask reportingTask = taskNode.getReportingTask();
         taskNode.setScheduledState(ScheduledState.STOPPED);
@@ -302,9 +307,13 @@ public final class StandardProcessScheduler implements 
ProcessScheduler {
 
                     agent.unschedule(taskNode, lifecycleState);
 
-                    if (lifecycleState.getActiveThreadCount() == 0 && 
lifecycleState.mustCallOnStoppedMethods()) {
+                    // If active thread count == 1, that indicates that all 
execution threads have completed. We use 1 here instead of 0 because
+                    // when the Reporting Task is unscheduled, we immediately 
increment the thread count to 1 as an indicator that we've not completely 
finished.
+                    if (lifecycleState.getActiveThreadCount() == 1 && 
lifecycleState.mustCallOnStoppedMethods()) {
                         
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, 
reportingTask, configurationContext);
                     }
+
+                    lifecycleState.decrementActiveThreadCount();
                 }
             }
         };

Reply via email to