This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch support/nifi-1.x in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push: new 4d00f8e156 NIFI-12228: This closes #7882. Fixed issue with FlowFile Concucrrency that can occasionally bring in more data than it should. 4d00f8e156 is described below commit 4d00f8e1563ffec40d8d601ac2b678ad0414bb5b Author: Mark Payne <marka...@hotmail.com> AuthorDate: Fri Oct 13 11:03:11 2023 -0400 NIFI-12228: This closes #7882. Fixed issue with FlowFile Concucrrency that can occasionally bring in more data than it should. Signed-off-by: Joseph Witt <joew...@apache.org> --- .../nifi/groups/SingleConcurrencyFlowFileGate.java | 17 ++++++++++++++ .../org/apache/nifi/groups/StandardDataValve.java | 27 ++++++++++++++-------- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java index 2282636b57..c006cdb659 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java @@ -36,6 +36,16 @@ public class SingleConcurrencyFlowFileGate implements FlowFileGate { return false; } + // We need to try to open flow into the Port's group. To do this, we need to get the data valve for the parent group, + // as it is responsible for data flowing into and out of its children. + final ProcessGroup dataValveGroup = port.getProcessGroup().getParent(); + final DataValve dataValve = dataValveGroup.getDataValve(); + final boolean openFlowIntoGroup = dataValve.tryOpenFlowIntoGroup(port.getProcessGroup()); + if (!openFlowIntoGroup) { + claimed.set(false); + return false; + } + // The claim is now held by this thread. Check if the ProcessGroup is empty. final boolean empty = !port.getProcessGroup().isDataQueued(); if (empty) { @@ -43,6 +53,9 @@ public class SingleConcurrencyFlowFileGate implements FlowFileGate { return true; } + // We have already opened flow into group, so now we must close it, since we are not allowing flow in + dataValve.closeFlowIntoGroup(port.getProcessGroup()); + // Process Group was not empty, so we cannot allow any more FlowFiles through. Reset claimed to false and return false, // indicating that the caller did not obtain the claim. claimed.set(false); @@ -52,5 +65,9 @@ public class SingleConcurrencyFlowFileGate implements FlowFileGate { @Override public void releaseClaim(final Port port) { claimed.set(false); + + final ProcessGroup dataValveGroup = port.getProcessGroup().getParent(); + final DataValve dataValve = dataValveGroup.getDataValve(); + dataValve.closeFlowIntoGroup(port.getProcessGroup()); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java index df5658638f..5c1c1ee5ec 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java @@ -84,10 +84,15 @@ public class StandardDataValve implements DataValve { if (destinationGroup.isDataQueued()) { // If the destination group already has data queued up, and the valve is not already open, do not allow data to // flow into the group. If we did, we would end up mixing together two different batches of data. - logger.debug("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup); + logger.trace("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup); return "Process Group already has data queued and valve is not already allowing data into group"; } + if (destinationGroup.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) { + logger.trace("Will not allow data to flow into {} because Outbound Policy is Batch Output and valve is already open to allow data to flow out of group", destinationGroup); + return "Data Valve is already allowing data to flow out of group"; + } + for (final Port port : destinationGroup.getInputPorts()) { for (final Connection connection : port.getIncomingConnections()) { final Connectable sourceConnectable = connection.getSource(); @@ -102,7 +107,7 @@ public class StandardDataValve implements DataValve { final boolean flowingOutOfSourceGroup = groupsWithDataFlowingOut.contains(sourceGroup.getIdentifier()); if (Boolean.TRUE.equals(flowingOutOfSourceGroup)) { - logger.debug("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out", + logger.trace("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out", destinationGroup, port, sourceConnectable); return "Source connected to Input Port is an Output Port with Batch Output and is currently allowing data to flow out"; } @@ -119,13 +124,15 @@ public class StandardDataValve implements DataValve { return; } - for (final Port port : destinationGroup.getInputPorts()) { - for (final Connection connection : port.getIncomingConnections()) { - if (!connection.getFlowFileQueue().isEmpty()) { - logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve", - destinationGroup, connection); + if (destinationGroup.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) { + for (final Port port : destinationGroup.getInputPorts()) { + for (final Connection connection : port.getIncomingConnections()) { + if (!connection.getFlowFileQueue().isEmpty()) { + logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve", + destinationGroup, connection); - return; + return; + } } } } @@ -175,14 +182,14 @@ public class StandardDataValve implements DataValve { } if (!connection.getFlowFileQueue().isEmpty()) { - logger.debug("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is " + logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is " + "configured with a FlowFileConcurrency of Batch Per Node.", sourceGroup, port, connection); return "Output Connection already has data queued"; } final boolean dataFlowingIntoDestination = groupsWithDataFlowingIn.contains(destinationProcessGroup.getIdentifier()); if (dataFlowingIntoDestination) { - logger.debug("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is " + logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is " + "currently allowing data to flow in", sourceGroup, port, connection); return "Destination Process Group is allowing data to flow in"; }