This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 5223a06050 NIFI-14690: Reverted to the previous logic of creating a
new LifecycleState object in stateless engine when calling ProcessorNode stop.
We changed to using the shared instance to ensure that thread counts are
correct when a Processor is terminated. But for stopping we need a separate
instance to ensure that we properly shutdown all instances. In testing, I also
noticed that we were calling StatelessFlowTask.shutdown() synchronously for
each instance when we should be a [...]
5223a06050 is described below
commit 5223a0605027a9c18d04849ec454b44d4614eec8
Author: Mark Payne <[email protected]>
AuthorDate: Tue Jun 24 14:38:05 2025 -0400
NIFI-14690: Reverted to the previous logic of creating a new LifecycleState
object in stateless engine when calling ProcessorNode stop. We changed to using
the shared instance to ensure that thread counts are correct when a Processor
is terminated. But for stopping we need a separate instance to ensure that we
properly shutdown all instances. In testing, I also noticed that we were
calling StatelessFlowTask.shutdown() synchronously for each instance when we
should be able to shut them [...]
Signed-off-by: Pierre Villard <[email protected]>
This closes #10045.
---
.../main/java/org/apache/nifi/processors/standard/DebugFlow.java | 2 +-
.../apache/nifi/controller/repository/StandardProcessSession.java | 2 --
.../java/org/apache/nifi/controller/tasks/StatelessFlowTask.java | 6 +++---
.../java/org/apache/nifi/groups/StandardStatelessGroupNode.java | 3 +--
.../nifi/controller/scheduling/StatelessProcessScheduler.java | 6 +++++-
.../apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java | 1 -
6 files changed, 10 insertions(+), 10 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
index a50b1a8974..ff21e34476 100644
---
a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
+++
b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
@@ -550,7 +550,7 @@ public class DebugFlow extends AbstractProcessor {
try {
if (sleepMillis > 0) {
sleep(sleepMillis,
context.getProperty(IGNORE_INTERRUPTS).asBoolean());
- getLogger().info("DebugFlow finishes sleeping at
completion of its onTrigger() method");
+ getLogger().info("DebugFlow finished sleeping at
completion of its onTrigger() method");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index cf045fb445..bee4144cde 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -521,7 +521,6 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
rollback();
} catch (final Throwable t2) {
t.addSuppressed(t2);
- LOG.error("Failed to roll back session {} for {}", this,
connectableDescription, t2);
}
throw t;
@@ -1192,7 +1191,6 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
@Override
public void rollback(final boolean penalize) {
rollback(penalize, false);
- verifyTaskActive();
}
protected synchronized void rollback(final boolean penalize, final boolean
rollbackCheckpoint) {
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
index 5bdc42c52d..0b251ccde4 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/StatelessFlowTask.java
@@ -359,16 +359,16 @@ public class StatelessFlowTask {
boolean stopped = false;
if (cause instanceof TerminatedTaskException) {
- final String input = inputFlowFiles.isEmpty() ? "no input
FlowFile" : inputFlowFiles.toString();
+ final String input = inputFlowFiles.isEmpty() ? "no input
FlowFile" : "input FlowFiles " + inputFlowFiles;
// A TerminatedTaskException will happen for 2 reasons: the group
was stopped, or it timed out.
// If it was stopped, just log at an INFO level, as this is
expected. If it timed out, log an error.
final ScheduledState desiredState =
this.statelessGroupNode.getDesiredState();
if (desiredState == ScheduledState.STOPPED) {
- logger.info("Stateless Flow canceled while running with input
{}", input);
+ logger.info("Stateless Flow canceled while running with {}",
input);
stopped = true;
} else {
- logger.error("Stateless Flow timed out while running with
input {}", input);
+ logger.error("Stateless Flow timed out while running with {}",
input);
}
} else {
final String input = inputFlowFiles.isEmpty() ? "with no input
FlowFile" : " for input " + inputFlowFiles;
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
index 96ff6c9610..967883e68f 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardStatelessGroupNode.java
@@ -316,11 +316,10 @@ public class StandardStatelessGroupNode implements
StatelessGroupNode {
private void shutdownFlows() {
final List<StatelessFlowTask> tasks = initializedFlowTasks;
if (tasks != null) {
- tasks.forEach(StatelessFlowTask::shutdown);
+ tasks.stream().parallel().forEach(StatelessFlowTask::shutdown);
}
}
-
private StandardStatelessFlow createStatelessFlow(final ProcessGroup
group, final VersionedExternalFlow versionedExternalFlow) {
final Set<String> failurePortNames = group.getOutputPorts().stream()
.filter(port -> port.getPortFunction() == PortFunction.FAILURE)
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
index 555c1373ea..2f6c38ac8f 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/controller/scheduling/StatelessProcessScheduler.java
@@ -28,6 +28,7 @@ import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.SchedulingAgentCallback;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
@@ -162,7 +163,10 @@ public class StatelessProcessScheduler implements
ProcessScheduler {
public CompletableFuture<Void> stopProcessor(final ProcessorNode procNode,
final ProcessorStopLifecycleMethods lifecycleMethods) {
logger.info("Stopping {}", procNode);
final ProcessContext processContext =
processContextFactory.createProcessContext(procNode);
- final LifecycleState lifecycleState =
lifecycleStateManager.getOrRegisterLifecycleState(procNode.getIdentifier(),
false, false);
+
+ final LifecycleState lifecycleState = new
LifecycleState(procNode.getIdentifier());
+ final boolean scheduled = procNode.getScheduledState() ==
ScheduledState.RUNNING || procNode.getActiveThreadCount() > 0;
+ lifecycleState.setScheduled(scheduled);
return procNode.stop(this, this.componentLifeCycleThreadPool,
processContext, schedulingAgent, lifecycleState, lifecycleMethods);
}
diff --git
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
index a6912bff15..40345920a2 100644
---
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
+++
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/flow/StandardStatelessFlowCurrent.java
@@ -303,7 +303,6 @@ public class StandardStatelessFlowCurrent implements
StatelessFlowCurrent {
}
}
-
private void trigger(final Connectable connectable, final
ProcessSessionFactory sessionFactory, final TrackedStats trackedStats) {
final ProcessContext processContext =
processContextFactory.createProcessContext(connectable);