This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 5223a06050 NIFI-14690: Reverted to the previous logic of creating a 
new LifecycleState object in stateless engine when calling ProcessorNode stop. 
We changed to using the shared instance to ensure that thread counts are 
correct when a Processor is terminated. But for stopping we need a separate 
instance to ensure that we properly shutdown all instances. In testing, I also 
noticed that we were calling StatelessFlowTask.shutdown() synchronously for 
each instance when we should be a [...]
5223a06050 is described below

commit 5223a0605027a9c18d04849ec454b44d4614eec8
Author: Mark Payne <[email protected]>
AuthorDate: Tue Jun 24 14:38:05 2025 -0400

    NIFI-14690: Reverted to the previous logic of creating a new LifecycleState 
object in stateless engine when calling ProcessorNode stop. We changed to using 
the shared instance to ensure that thread counts are correct when a Processor 
is terminated. But for stopping we need a separate instance to ensure that we 
properly shutdown all instances. In testing, I also noticed that we were 
calling StatelessFlowTask.shutdown() synchronously for each instance when we 
should be able to shut them [...]
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #10045.
---
 .../main/java/org/apache/nifi/processors/standard/DebugFlow.java    | 2 +-
 .../apache/nifi/controller/repository/StandardProcessSession.java   | 2 --
 .../java/org/apache/nifi/controller/tasks/StatelessFlowTask.java    | 6 +++---
 .../java/org/apache/nifi/groups/StandardStatelessGroupNode.java     | 3 +--
 .../nifi/controller/scheduling/StatelessProcessScheduler.java       | 6 +++++-
 .../apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java    | 1 -
 6 files changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
index a50b1a8974..ff21e34476 100644
--- 
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
+++ 
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
@@ -550,7 +550,7 @@ public class DebugFlow extends AbstractProcessor {
             try {
                 if (sleepMillis > 0) {
                     sleep(sleepMillis, 
context.getProperty(IGNORE_INTERRUPTS).asBoolean());
-                    getLogger().info("DebugFlow finishes sleeping at 
completion of its onTrigger() method");
+                    getLogger().info("DebugFlow finished sleeping at 
completion of its onTrigger() method");
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index cf045fb445..bee4144cde 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -521,7 +521,6 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
                 rollback();
             } catch (final Throwable t2) {
                 t.addSuppressed(t2);
-                LOG.error("Failed to roll back session {} for {}", this, 
connectableDescription, t2);
             }
 
             throw t;
@@ -1192,7 +1191,6 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
     @Override
     public void rollback(final boolean penalize) {
         rollback(penalize, false);
-        verifyTaskActive();
     }
 
     protected synchronized void rollback(final boolean penalize, final boolean 
rollbackCheckpoint) {
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
index 5bdc42c52d..0b251ccde4 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
@@ -359,16 +359,16 @@ public class StatelessFlowTask {
 
         boolean stopped = false;
         if (cause instanceof TerminatedTaskException) {
-            final String input = inputFlowFiles.isEmpty() ? "no input 
FlowFile" : inputFlowFiles.toString();
+            final String input = inputFlowFiles.isEmpty() ? "no input 
FlowFile" : "input FlowFiles " + inputFlowFiles;
 
             // A TerminatedTaskException will happen for 2 reasons: the group 
was stopped, or it timed out.
             // If it was stopped, just log at an INFO level, as this is 
expected. If it timed out, log an error.
             final ScheduledState desiredState = 
this.statelessGroupNode.getDesiredState();
             if (desiredState == ScheduledState.STOPPED) {
-                logger.info("Stateless Flow canceled while running with input 
{}", input);
+                logger.info("Stateless Flow canceled while running with {}", 
input);
                 stopped = true;
             } else {
-                logger.error("Stateless Flow timed out while running with 
input {}", input);
+                logger.error("Stateless Flow timed out while running with {}", 
input);
             }
         } else {
             final String input = inputFlowFiles.isEmpty() ? "with no input 
FlowFile" : " for input " + inputFlowFiles;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
index 96ff6c9610..967883e68f 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
@@ -316,11 +316,10 @@ public class StandardStatelessGroupNode implements 
StatelessGroupNode {
     private void shutdownFlows() {
         final List<StatelessFlowTask> tasks = initializedFlowTasks;
         if (tasks != null) {
-            tasks.forEach(StatelessFlowTask::shutdown);
+            tasks.stream().parallel().forEach(StatelessFlowTask::shutdown);
         }
     }
 
-
     private StandardStatelessFlow createStatelessFlow(final ProcessGroup 
group, final VersionedExternalFlow versionedExternalFlow) {
         final Set<String> failurePortNames = group.getOutputPorts().stream()
             .filter(port -> port.getPortFunction() == PortFunction.FAILURE)
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index 555c1373ea..2f6c38ac8f 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -28,6 +28,7 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.SchedulingAgentCallback;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -162,7 +163,10 @@ public class StatelessProcessScheduler implements 
ProcessScheduler {
     public CompletableFuture<Void> stopProcessor(final ProcessorNode procNode, 
final ProcessorStopLifecycleMethods lifecycleMethods) {
         logger.info("Stopping {}", procNode);
         final ProcessContext processContext = 
processContextFactory.createProcessContext(procNode);
-        final LifecycleState lifecycleState = 
lifecycleStateManager.getOrRegisterLifecycleState(procNode.getIdentifier(), 
false, false);
+
+        final LifecycleState lifecycleState = new 
LifecycleState(procNode.getIdentifier());
+        final boolean scheduled = procNode.getScheduledState() == 
ScheduledState.RUNNING || procNode.getActiveThreadCount() > 0;
+        lifecycleState.setScheduled(scheduled);
         return procNode.stop(this, this.componentLifeCycleThreadPool, 
processContext, schedulingAgent, lifecycleState, lifecycleMethods);
     }
 
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
index a6912bff15..40345920a2 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
@@ -303,7 +303,6 @@ public class StandardStatelessFlowCurrent implements 
StatelessFlowCurrent {
         }
     }
 
-
     private void trigger(final Connectable connectable, final 
ProcessSessionFactory sessionFactory, final TrackedStats trackedStats) {
         final ProcessContext processContext = 
processContextFactory.createProcessContext(connectable);
 

Reply via email to