This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 73356ea448dced6a789f2d71c5cb5aceac45520e Author: Mark Payne <marka...@hotmail.com> AuthorDate: Wed Mar 9 13:44:08 2022 -0500 NIFI-9783: This closes #5855. When migrating FlowFiles from one ProcessSession to another, if any FlowFile had already been transferred, and the Relationship to which it was transferred was auto-terminated, we were updating the wrong member variable, which threw off our stats for the processor. Fixed that. Signed-off-by: Joe Witt <joew...@apache.org> --- .../repository/StandardProcessSession.java | 21 +++++++++++--- .../repository/StandardProcessSessionIT.java | 33 ++++++++++++++++++++++ 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index 5bec8a6..bf1a1ab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -1599,11 +1599,24 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn } if (repoRecord.getTransferRelationship() != null) { - flowFilesOut--; - contentSizeOut -= flowFile.getSize(); + final Relationship transferRelationship = repoRecord.getTransferRelationship(); + final Collection<Connection> destinations = context.getConnections(transferRelationship); + final int numDestinations = destinations.size(); + final boolean autoTerminated = numDestinations == 0 && context.getConnectable().isAutoTerminated(transferRelationship); - newOwner.flowFilesOut++; - newOwner.contentSizeOut += flowFile.getSize(); + if (autoTerminated) { + removedCount--; + removedBytes -= flowFile.getSize(); + + newOwner.removedCount++; + newOwner.removedBytes += flowFile.getSize(); + } else { + flowFilesOut--; + contentSizeOut -= flowFile.getSize(); + + newOwner.flowFilesOut++; + newOwner.contentSizeOut += flowFile.getSize(); + } } final List<ProvenanceEventRecord> events = generatedProvenanceEvents.remove(flowFile); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java index dc51885..4e300d1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/StandardProcessSessionIT.java @@ -2453,6 +2453,39 @@ public class StandardProcessSessionIT { } @Test + public void testMigrateAfterTransferToAutoTerminatedRelationship() { + final long start = System.currentTimeMillis(); + + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, out -> out.write("Hello".getBytes(StandardCharsets.UTF_8))); + + final StandardProcessSession newSession = new StandardProcessSession(context, () -> false); + + when(connectable.getConnections(any(Relationship.class))).thenReturn(Collections.emptySet()); + when(connectable.isAutoTerminated(any(Relationship.class))).thenReturn(true); + + session.transfer(flowFile, new Relationship.Builder().name("success").build()); + session.migrate(newSession, Collections.singleton(flowFile)); + + session.commit(); + + RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(start - 1); + FlowFileEvent event = report.getReportEntries().values().iterator().next(); + assertEquals(0, event.getFlowFilesRemoved()); + assertEquals(0, event.getContentSizeRemoved()); + assertEquals(0, event.getFlowFilesOut()); + assertEquals(0, event.getContentSizeOut()); + + newSession.commit(); + report = flowFileEventRepository.reportTransferEvents(start - 1); + event = report.getReportEntries().values().iterator().next(); + assertEquals(1, event.getFlowFilesRemoved()); + assertEquals(5, event.getContentSizeRemoved()); + assertEquals(0, event.getFlowFilesOut()); + assertEquals(0, event.getContentSizeOut()); + } + + @Test public void testNewFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() { FlowFile flowFile = session.create(); for (int i = 0; i < 5; i++) {