This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new d86d10fe92 NIFI-14679: Do not throw TerminatedTaskException from
ProcessSession.adjustCounter if immediate = true
d86d10fe92 is described below
commit d86d10fe92c09677e9477cc8be802995f0d27b4f
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jun 20 14:33:47 2025 -0400
NIFI-14679: Do not throw TerminatedTaskException from
ProcessSession.adjustCounter if immediate = true
---
.../nifi/controller/repository/StandardProcessSession.java | 13 +++++++++----
.../apache/nifi/stateless/basics/ProcessorLifecycleIT.java | 8 ++------
2 files changed, 11 insertions(+), 10 deletions(-)
diff --git
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3a9632b2d3..cf045fb445 100644
---
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1838,7 +1838,12 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
@Override
public void adjustCounter(final String name, final long delta, final
boolean immediate) {
- verifyTaskActive();
+ // If we are adjusting the counter immediately, allow it even if the
task is terminated. The contract states:
+ // "the counter will be updated immediately, without regard to whether
the session is committed or rolled back"
+ // so we need to ensure that we allow adjusting the counter even after
the task is terminated.
+ if (!immediate) {
+ verifyTaskActive();
+ }
final Map<String, Long> counters;
if (immediate) {
@@ -1863,11 +1868,11 @@ public class StandardProcessSession implements
ProcessSession, ProvenanceEventEn
private void adjustCounter(final String name, final long delta, final
Map<String, Long> map) {
Long curVal = map.get(name);
if (curVal == null) {
- curVal = Long.valueOf(0L);
+ curVal = 0L;
}
- final long newValue = curVal.longValue() + delta;
- map.put(name, Long.valueOf(newValue));
+ final long newValue = curVal + delta;
+ map.put(name, newValue);
}
@Override
diff --git
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
index fe62588992..4d4cebb0c8 100644
---
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
+++
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
@@ -143,12 +143,8 @@ public class ProcessorLifecycleIT extends
StatelessSystemIT {
assertTrue(optionalTriggerResult.isEmpty());
trigger.cancel();
- // Give it 2 seconds to finish its processing and make sure that we
see no counters incremented.
- // We expect no counters to be incremented because, even though the we
should see an attempt to increment
- // the counter, the ProcessSession should have been terminated,
disallowing the call to ProcessSession.adjustCounter()
- Thread.sleep(2000L);
- final Map<String, Long> counters =
dataflow.getCounters(Pattern.compile(".+"));
- assertTrue(counters.isEmpty(), "Expected no counters to be incremented
but found: " + counters);
+ // We expect Failure counter to get incremented because the processor
the processor is being terminated
+ assertCounters(dataflow, generate.getIdentifier(),
RUNNING_AND_FAILURE);
}
@Test