This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new c3a2e0c930 NIFI-14907 Updated MockProcessSession to support testing
commit failures (#10245)
c3a2e0c930 is described below
commit c3a2e0c93004552ba5cf57864fd034b51df40932
Author: Alaksiej Ščarbaty <[email protected]>
AuthorDate: Tue Sep 2 15:20:16 2025 +0200
NIFI-14907 Updated MockProcessSession to support testing commit failures
(#10245)
Signed-off-by: David Handermann <[email protected]>
---
.../org/apache/nifi/util/MockProcessSession.java | 94 +++++++++++++++++++++-
.../org/apache/nifi/util/MockSessionFactory.java | 7 +-
.../nifi/util/StandardProcessorTestRunner.java | 6 +-
3 files changed, 102 insertions(+), 5 deletions(-)
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
index 6822ddfad4..0b3499f2a2 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessSession.java
@@ -88,6 +88,7 @@ public class MockProcessSession implements ProcessSession {
private final StateManager stateManager;
private final boolean allowSynchronousCommits;
private final boolean allowRecursiveReads;
+ private final boolean failCommit;
private boolean committed = false;
private boolean rolledBack = false;
@@ -109,11 +110,16 @@ public class MockProcessSession implements ProcessSession
{
public MockProcessSession(final SharedSessionState sharedState, final
Processor processor, final boolean enforceStreamsClosed, final StateManager
stateManager,
final boolean allowSynchronousCommits) {
- this(sharedState, processor, enforceStreamsClosed, stateManager,
allowSynchronousCommits, false);
+ this(sharedState, processor, enforceStreamsClosed, stateManager,
allowSynchronousCommits, false, false);
}
public MockProcessSession(final SharedSessionState sharedState, final
Processor processor, final boolean enforceStreamsClosed, final StateManager
stateManager,
final boolean allowSynchronousCommits, final
boolean allowRecursiveReads) {
+ this(sharedState, processor, enforceStreamsClosed, stateManager,
allowSynchronousCommits, allowRecursiveReads, false);
+ }
+
+ private MockProcessSession(final SharedSessionState sharedState, final
Processor processor, final boolean enforceStreamsClosed, final StateManager
stateManager,
+ final boolean allowSynchronousCommits, final
boolean allowRecursiveReads, final boolean failCommit) {
this.processor = processor;
this.enforceStreamsClosed = enforceStreamsClosed;
this.sharedState = sharedState;
@@ -122,6 +128,11 @@ public class MockProcessSession implements ProcessSession {
this.stateManager = stateManager;
this.allowSynchronousCommits = allowSynchronousCommits;
this.allowRecursiveReads = allowRecursiveReads;
+ this.failCommit = failCommit;
+ }
+
+ public static Builder builder(final SharedSessionState sharedState, final
Processor processor) {
+ return new Builder(sharedState, processor);
}
@Override
@@ -285,7 +296,12 @@ public class MockProcessSession implements ProcessSession {
"enabled by calling TestRunner.");
}
- commitInternal();
+ try {
+ commitInternal();
+ } catch (final Throwable t) {
+ rollback();
+ throw t;
+ }
}
private void commitInternal() {
@@ -293,6 +309,10 @@ public class MockProcessSession implements ProcessSession {
throw new FlowFileHandlingException("Cannot commit session because
the following FlowFiles have not been removed or transferred: " +
beingProcessed);
}
+ if (failCommit) {
+ throw new FlowFileHandlingException("Cannot commit session because
the session was requested to fail by a test");
+ }
+
closeStreams(openInputStreams, enforceStreamsClosed);
closeStreams(openOutputStreams, enforceStreamsClosed);
@@ -313,7 +333,12 @@ public class MockProcessSession implements ProcessSession {
@Override
public void commitAsync() {
- commitInternal();
+ try {
+ commitInternal();
+ } catch (final Throwable t) {
+ rollback();
+ throw t;
+ }
}
@Override
@@ -1456,4 +1481,67 @@ public class MockProcessSession implements
ProcessSession {
final String providedUuid =
curFlowFile.getAttribute(CoreAttributes.UUID.key());
return curUuid.equals(providedUuid);
}
+
+ public static final class Builder {
+
+ private final SharedSessionState sharedState;
+ private final Processor processor;
+ private StateManager stateManager;
+
+ private boolean enforceStreamsClosed = true;
+ private boolean allowRecursiveReads;
+ private boolean allowSynchronousCommits;
+ private boolean failCommit;
+
+ private Builder(final SharedSessionState sharedState, final Processor
processor) {
+ this.sharedState = sharedState;
+ this.processor = processor;
+ this.stateManager = new MockStateManager(processor);
+ }
+
+ public Builder stateManager(final StateManager stateManager) {
+ this.stateManager = stateManager;
+ return this;
+ }
+
+ public Builder enforceStreamsClosed(final boolean
enforceStreamsClosed) {
+ this.enforceStreamsClosed = enforceStreamsClosed;
+ return this;
+ }
+
+ public Builder allowSynchronousCommits() {
+ return allowSynchronousCommits(true);
+ }
+
+ public Builder allowSynchronousCommits(final boolean
allowSynchronousCommits) {
+ this.allowSynchronousCommits = allowSynchronousCommits;
+ return this;
+ }
+
+ public Builder allowRecursiveReads(final boolean allowRecursiveReads) {
+ this.allowRecursiveReads = allowRecursiveReads;
+ return this;
+ }
+
+ public Builder failCommit() {
+ return failCommit(true);
+ }
+
+ public Builder failCommit(final boolean failCommit) {
+ this.failCommit = failCommit;
+ return this;
+ }
+
+ public MockProcessSession build() {
+ return new MockProcessSession(
+ sharedState,
+ processor,
+ enforceStreamsClosed,
+ stateManager,
+ allowSynchronousCommits,
+ allowRecursiveReads,
+ failCommit
+ );
+ }
+ }
}
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
index 4fc53ec711..ea03b2f00e 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockSessionFactory.java
@@ -47,7 +47,12 @@ public class MockSessionFactory implements
ProcessSessionFactory {
@Override
public ProcessSession createSession() {
- final MockProcessSession session = new MockProcessSession(sharedState,
processor, enforceReadStreamsClosed, stateManager,
allowSynchronousSessionCommits, allowRecursiveReads);
+ final MockProcessSession session =
MockProcessSession.builder(sharedState, processor)
+ .enforceStreamsClosed(enforceReadStreamsClosed)
+ .stateManager(stateManager)
+ .allowSynchronousCommits(allowSynchronousSessionCommits)
+ .allowRecursiveReads(allowRecursiveReads)
+ .build();
createdSessions.add(session);
return session;
}
diff --git
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
index aa479a2c63..a8b69a2277 100644
---
a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
+++
b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
@@ -519,7 +519,11 @@ public class StandardProcessorTestRunner implements
TestRunner {
@Override
public MockFlowFile enqueue(final InputStream data, final Map<String,
String> attributes) {
- final MockProcessSession session = new MockProcessSession(new
SharedSessionState(processor, idGenerator), processor,
enforceReadStreamsClosed, processorStateManager);
+ final SharedSessionState sessionState = new
SharedSessionState(processor, idGenerator);
+ final MockProcessSession session =
MockProcessSession.builder(sessionState, processor)
+ .enforceStreamsClosed(enforceReadStreamsClosed)
+ .stateManager(processorStateManager)
+ .build();
MockFlowFile flowFile = session.create();
flowFile = session.importFrom(data, flowFile);
flowFile = session.putAllAttributes(flowFile, attributes);