This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d86d10fe92 NIFI-14679: Do not throw TerminatedTaskException from 
ProcessSession.adjustCounter if immediate = true
d86d10fe92 is described below

commit d86d10fe92c09677e9477cc8be802995f0d27b4f
Author: Mark Payne <[email protected]>
AuthorDate: Fri Jun 20 14:33:47 2025 -0400

    NIFI-14679: Do not throw TerminatedTaskException from 
ProcessSession.adjustCounter if immediate = true
---
 .../nifi/controller/repository/StandardProcessSession.java  | 13 +++++++++----
 .../apache/nifi/stateless/basics/ProcessorLifecycleIT.java  |  8 ++------
 2 files changed, 11 insertions(+), 10 deletions(-)

diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 3a9632b2d3..cf045fb445 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -1838,7 +1838,12 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
 
     @Override
     public void adjustCounter(final String name, final long delta, final 
boolean immediate) {
-        verifyTaskActive();
+        // If we are adjusting the counter immediately, allow it even if the 
task is terminated. The contract states:
+        // "the counter will be updated immediately, without regard to whether 
the session is committed or rolled back"
+        // so we need to ensure that we allow adjusting the counter even after 
the task is terminated.
+        if (!immediate) {
+            verifyTaskActive();
+        }
 
         final Map<String, Long> counters;
         if (immediate) {
@@ -1863,11 +1868,11 @@ public class StandardProcessSession implements 
ProcessSession, ProvenanceEventEn
     private void adjustCounter(final String name, final long delta, final 
Map<String, Long> map) {
         Long curVal = map.get(name);
         if (curVal == null) {
-            curVal = Long.valueOf(0L);
+            curVal = 0L;
         }
 
-        final long newValue = curVal.longValue() + delta;
-        map.put(name, Long.valueOf(newValue));
+        final long newValue = curVal + delta;
+        map.put(name, newValue);
     }
 
     @Override
diff --git 
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
index fe62588992..4d4cebb0c8 100644
--- 
a/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
+++ 
b/nifi-system-tests/nifi-stateless-system-test-suite/src/test/java/org/apache/nifi/stateless/basics/ProcessorLifecycleIT.java
@@ -143,12 +143,8 @@ public class ProcessorLifecycleIT extends 
StatelessSystemIT {
         assertTrue(optionalTriggerResult.isEmpty());
         trigger.cancel();
 
-        // Give it 2 seconds to finish its processing and make sure that we 
see no counters incremented.
-        // We expect no counters to be incremented because, even though the we 
should see an attempt to increment
-        // the counter, the ProcessSession should have been terminated, 
disallowing the call to ProcessSession.adjustCounter()
-        Thread.sleep(2000L);
-        final Map<String, Long> counters = 
dataflow.getCounters(Pattern.compile(".+"));
-        assertTrue(counters.isEmpty(), "Expected no counters to be incremented 
but found: " + counters);
+        // We expect Failure counter to get incremented because the processor 
the processor is being terminated
+        assertCounters(dataflow, generate.getIdentifier(), 
RUNNING_AND_FAILURE);
     }
 
     @Test

Reply via email to