This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 4d00f8e156 NIFI-12228: This closes #7882. Fixed issue with FlowFile 
Concucrrency that can occasionally bring in more data than it should.
4d00f8e156 is described below

commit 4d00f8e1563ffec40d8d601ac2b678ad0414bb5b
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Fri Oct 13 11:03:11 2023 -0400

    NIFI-12228: This closes #7882. Fixed issue with FlowFile Concucrrency that 
can occasionally bring in more data than it should.
    
    Signed-off-by: Joseph Witt <joew...@apache.org>
---
 .../nifi/groups/SingleConcurrencyFlowFileGate.java | 17 ++++++++++++++
 .../org/apache/nifi/groups/StandardDataValve.java  | 27 ++++++++++++++--------
 2 files changed, 34 insertions(+), 10 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
index 2282636b57..c006cdb659 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
@@ -36,6 +36,16 @@ public class SingleConcurrencyFlowFileGate implements 
FlowFileGate {
             return false;
         }
 
+        // We need to try to open flow into the Port's group. To do this, we 
need to get the data valve for the parent group,
+        // as it is responsible for data flowing into and out of its children.
+        final ProcessGroup dataValveGroup = port.getProcessGroup().getParent();
+        final DataValve dataValve = dataValveGroup.getDataValve();
+        final boolean openFlowIntoGroup = 
dataValve.tryOpenFlowIntoGroup(port.getProcessGroup());
+        if (!openFlowIntoGroup) {
+            claimed.set(false);
+            return false;
+        }
+
         // The claim is now held by this thread. Check if the ProcessGroup is 
empty.
         final boolean empty = !port.getProcessGroup().isDataQueued();
         if (empty) {
@@ -43,6 +53,9 @@ public class SingleConcurrencyFlowFileGate implements 
FlowFileGate {
             return true;
         }
 
+        // We have already opened flow into group, so now we must close it, 
since we are not allowing flow in
+        dataValve.closeFlowIntoGroup(port.getProcessGroup());
+
         // Process Group was not empty, so we cannot allow any more FlowFiles 
through. Reset claimed to false and return false,
         // indicating that the caller did not obtain the claim.
         claimed.set(false);
@@ -52,5 +65,9 @@ public class SingleConcurrencyFlowFileGate implements 
FlowFileGate {
     @Override
     public void releaseClaim(final Port port) {
         claimed.set(false);
+
+        final ProcessGroup dataValveGroup = port.getProcessGroup().getParent();
+        final DataValve dataValve = dataValveGroup.getDataValve();
+        dataValve.closeFlowIntoGroup(port.getProcessGroup());
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
index df5658638f..5c1c1ee5ec 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
@@ -84,10 +84,15 @@ public class StandardDataValve implements DataValve {
         if (destinationGroup.isDataQueued()) {
             // If the destination group already has data queued up, and the 
valve is not already open, do not allow data to
             // flow into the group. If we did, we would end up mixing together 
two different batches of data.
-            logger.debug("Will not allow data to flow into {} because valve is 
not already open and the Process Group has data queued", destinationGroup);
+            logger.trace("Will not allow data to flow into {} because valve is 
not already open and the Process Group has data queued", destinationGroup);
             return "Process Group already has data queued and valve is not 
already allowing data into group";
         }
 
+        if (destinationGroup.getFlowFileOutboundPolicy() == 
FlowFileOutboundPolicy.BATCH_OUTPUT && 
groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) {
+            logger.trace("Will not allow data to flow into {} because Outbound 
Policy is Batch Output and valve is already open to allow data to flow out of 
group", destinationGroup);
+            return "Data Valve is already allowing data to flow out of group";
+        }
+
         for (final Port port : destinationGroup.getInputPorts()) {
             for (final Connection connection : port.getIncomingConnections()) {
                 final Connectable sourceConnectable = connection.getSource();
@@ -102,7 +107,7 @@ public class StandardDataValve implements DataValve {
 
                 final boolean flowingOutOfSourceGroup = 
groupsWithDataFlowingOut.contains(sourceGroup.getIdentifier());
                 if (Boolean.TRUE.equals(flowingOutOfSourceGroup)) {
-                    logger.debug("Will not allow data to flow into {} because 
port {} has an incoming connection from {} and that Process Group is currently 
allowing data to flow out",
+                    logger.trace("Will not allow data to flow into {} because 
port {} has an incoming connection from {} and that Process Group is currently 
allowing data to flow out",
                         destinationGroup, port, sourceConnectable);
                     return "Source connected to Input Port is an Output Port 
with Batch Output and is currently allowing data to flow out";
                 }
@@ -119,13 +124,15 @@ public class StandardDataValve implements DataValve {
             return;
         }
 
-        for (final Port port : destinationGroup.getInputPorts()) {
-            for (final Connection connection : port.getIncomingConnections()) {
-                if (!connection.getFlowFileQueue().isEmpty()) {
-                    logger.debug("Triggered to close flow of data into group 
{} but Input Port has incoming Connection {}, which is not empty, so will not 
close valve",
-                        destinationGroup, connection);
+        if (destinationGroup.getFlowFileConcurrency() == 
FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
+            for (final Port port : destinationGroup.getInputPorts()) {
+                for (final Connection connection : 
port.getIncomingConnections()) {
+                    if (!connection.getFlowFileQueue().isEmpty()) {
+                        logger.debug("Triggered to close flow of data into 
group {} but Input Port has incoming Connection {}, which is not empty, so will 
not close valve",
+                            destinationGroup, connection);
 
-                    return;
+                        return;
+                    }
                 }
             }
         }
@@ -175,14 +182,14 @@ public class StandardDataValve implements DataValve {
                 }
 
                 if (!connection.getFlowFileQueue().isEmpty()) {
-                    logger.debug("Not allowing data to flow out of {} because 
{} has a destination of {}, which has data queued and its Process Group is "
+                    logger.trace("Not allowing data to flow out of {} because 
{} has a destination of {}, which has data queued and its Process Group is "
                         + "configured with a FlowFileConcurrency of Batch Per 
Node.", sourceGroup, port, connection);
                     return "Output Connection already has data queued";
                 }
 
                 final boolean dataFlowingIntoDestination = 
groupsWithDataFlowingIn.contains(destinationProcessGroup.getIdentifier());
                 if (dataFlowingIntoDestination) {
-                    logger.debug("Not allowing data to flow out of {} because 
{} has a destination of {}, and its Process Group is "
+                    logger.trace("Not allowing data to flow out of {} because 
{} has a destination of {}, and its Process Group is "
                         + "currently allowing data to flow in", sourceGroup, 
port, connection);
                     return "Destination Process Group is allowing data to flow 
in";
                 }

Reply via email to