This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new c3a2e0c930 NIFI-14907 Updated MockProcessSession to support testing 
commit failures (#10245)
c3a2e0c930 is described below

commit c3a2e0c93004552ba5cf57864fd034b51df40932
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Tue Sep 2 15:20:16 2025 +0200

    NIFI-14907 Updated MockProcessSession to support testing commit failures 
(#10245)
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../org/apache/nifi/util/MockProcessSession.java   | 94 +++++++++++++++++++++-
 .../org/apache/nifi/util/MockSessionFactory.java   |  7 +-
 .../nifi/util/StandardProcessorTestRunner.java     |  6 +-
 3 files changed, 102 insertions(+), 5 deletions(-)

diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 6822ddfad4..0b3499f2a2 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -88,6 +88,7 @@ public class MockProcessSession implements ProcessSession {
     private final StateManager stateManager;
     private final boolean allowSynchronousCommits;
     private final boolean allowRecursiveReads;
+    private final boolean failCommit;
 
     private boolean committed = false;
     private boolean rolledBack = false;
@@ -109,11 +110,16 @@ public class MockProcessSession implements ProcessSession 
{
 
     public MockProcessSession(final SharedSessionState sharedState, final 
Processor processor, final boolean enforceStreamsClosed, final StateManager 
stateManager,
                               final boolean allowSynchronousCommits) {
-        this(sharedState, processor, enforceStreamsClosed, stateManager, 
allowSynchronousCommits, false);
+        this(sharedState, processor, enforceStreamsClosed, stateManager, 
allowSynchronousCommits, false, false);
     }
 
     public MockProcessSession(final SharedSessionState sharedState, final 
Processor processor, final boolean enforceStreamsClosed, final StateManager 
stateManager,
                               final boolean allowSynchronousCommits, final 
boolean allowRecursiveReads) {
+        this(sharedState, processor, enforceStreamsClosed, stateManager, 
allowSynchronousCommits, allowRecursiveReads, false);
+    }
+
+    private MockProcessSession(final SharedSessionState sharedState, final 
Processor processor, final boolean enforceStreamsClosed, final StateManager 
stateManager,
+                              final boolean allowSynchronousCommits, final 
boolean allowRecursiveReads, final boolean failCommit) {
         this.processor = processor;
         this.enforceStreamsClosed = enforceStreamsClosed;
         this.sharedState = sharedState;
@@ -122,6 +128,11 @@ public class MockProcessSession implements ProcessSession {
         this.stateManager = stateManager;
         this.allowSynchronousCommits = allowSynchronousCommits;
         this.allowRecursiveReads = allowRecursiveReads;
+        this.failCommit = failCommit;
+    }
+
+    public static Builder builder(final SharedSessionState sharedState, final 
Processor processor) {
+        return new Builder(sharedState, processor);
     }
 
     @Override
@@ -285,7 +296,12 @@ public class MockProcessSession implements ProcessSession {
                 "enabled by calling TestRunner.");
         }
 
-        commitInternal();
+        try {
+            commitInternal();
+        } catch (final Throwable t) {
+            rollback();
+            throw t;
+        }
     }
 
     private void commitInternal() {
@@ -293,6 +309,10 @@ public class MockProcessSession implements ProcessSession {
             throw new FlowFileHandlingException("Cannot commit session because 
the following FlowFiles have not been removed or transferred: " + 
beingProcessed);
         }
 
+        if (failCommit) {
+            throw new FlowFileHandlingException("Cannot commit session because 
the session was requested to fail by a test");
+        }
+
         closeStreams(openInputStreams, enforceStreamsClosed);
         closeStreams(openOutputStreams, enforceStreamsClosed);
 
@@ -313,7 +333,12 @@ public class MockProcessSession implements ProcessSession {
 
     @Override
     public void commitAsync() {
-        commitInternal();
+        try {
+            commitInternal();
+        } catch (final Throwable t) {
+            rollback();
+            throw t;
+        }
     }
 
     @Override
@@ -1456,4 +1481,67 @@ public class MockProcessSession implements 
ProcessSession {
         final String providedUuid = 
curFlowFile.getAttribute(CoreAttributes.UUID.key());
         return curUuid.equals(providedUuid);
     }
+
+    public static final class Builder {
+
+        private final SharedSessionState sharedState;
+        private final Processor processor;
+        private StateManager stateManager;
+
+        private boolean enforceStreamsClosed = true;
+        private boolean allowRecursiveReads;
+        private boolean allowSynchronousCommits;
+        private boolean failCommit;
+
+        private Builder(final SharedSessionState sharedState, final Processor 
processor) {
+            this.sharedState = sharedState;
+            this.processor = processor;
+            this.stateManager = new MockStateManager(processor);
+        }
+
+        public Builder stateManager(final StateManager stateManager) {
+            this.stateManager = stateManager;
+            return this;
+        }
+
+        public Builder enforceStreamsClosed(final boolean 
enforceStreamsClosed) {
+            this.enforceStreamsClosed = enforceStreamsClosed;
+            return this;
+        }
+
+        public Builder allowSynchronousCommits() {
+            return allowSynchronousCommits(true);
+        }
+
+        public Builder allowSynchronousCommits(final boolean 
allowSynchronousCommits) {
+            this.allowSynchronousCommits = allowSynchronousCommits;
+            return this;
+        }
+
+        public Builder allowRecursiveReads(final boolean allowRecursiveReads) {
+            this.allowRecursiveReads = allowRecursiveReads;
+            return this;
+        }
+
+        public Builder failCommit() {
+            return failCommit(true);
+        }
+
+        public Builder failCommit(final boolean failCommit) {
+            this.failCommit = failCommit;
+            return this;
+        }
+
+        public MockProcessSession build() {
+            return new MockProcessSession(
+                    sharedState,
+                    processor,
+                    enforceStreamsClosed,
+                    stateManager,
+                    allowSynchronousCommits,
+                    allowRecursiveReads,
+                    failCommit
+            );
+        }
+    }
 }
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
index 4fc53ec711..ea03b2f00e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
@@ -47,7 +47,12 @@ public class MockSessionFactory implements 
ProcessSessionFactory {
 
     @Override
     public ProcessSession createSession() {
-        final MockProcessSession session = new MockProcessSession(sharedState, 
processor, enforceReadStreamsClosed, stateManager, 
allowSynchronousSessionCommits, allowRecursiveReads);
+        final MockProcessSession session = 
MockProcessSession.builder(sharedState, processor)
+                .enforceStreamsClosed(enforceReadStreamsClosed)
+                .stateManager(stateManager)
+                .allowSynchronousCommits(allowSynchronousSessionCommits)
+                .allowRecursiveReads(allowRecursiveReads)
+                .build();
         createdSessions.add(session);
         return session;
     }
diff --git 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index aa479a2c63..a8b69a2277 100644
--- 
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++ 
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -519,7 +519,11 @@ public class StandardProcessorTestRunner implements 
TestRunner {
 
     @Override
     public MockFlowFile enqueue(final InputStream data, final Map<String, 
String> attributes) {
-        final MockProcessSession session = new MockProcessSession(new 
SharedSessionState(processor, idGenerator), processor, 
enforceReadStreamsClosed, processorStateManager);
+        final SharedSessionState sessionState = new 
SharedSessionState(processor, idGenerator);
+        final MockProcessSession session = 
MockProcessSession.builder(sessionState, processor)
+                .enforceStreamsClosed(enforceReadStreamsClosed)
+                .stateManager(processorStateManager)
+                .build();
         MockFlowFile flowFile = session.create();
         flowFile = session.importFrom(data, flowFile);
         flowFile = session.putAllAttributes(flowFile, attributes);

Reply via email to