This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 0cd841e4ae NIFI-13340 Fixed a bug in which an Output Port can leave a 
Process Group's DataValve open for output, but then the last FlowFile is 
terminated instead of going to an Output Port, ultimately resulting in the 
DataValve remaining open indefinitely. Now, this will be detected and the valve 
will be closed.
0cd841e4ae is described below

commit 0cd841e4ae96e366604122ca5a2651d733c609bc
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Mon Jun 10 15:57:08 2024 -0500

    NIFI-13340 Fixed a bug in which an Output Port can leave a Process Group's 
DataValve open for output, but then the last FlowFile is terminated instead of 
going to an Output Port, ultimately resulting in the DataValve remaining open 
indefinitely. Now, this will be detected and the valve will be closed.
    
    This closes #8951
    
    Signed-off-by: David Handermann <exceptionfact...@apache.org>
    
    (cherry picked from commit 039cd2f18a7063b5b5a50c4e16371443e52baabc)
---
 .../org/apache/nifi/groups/StandardDataValve.java  | 64 ++++++++++++++++++----
 1 file changed, 52 insertions(+), 12 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
index 5c1c1ee5ec..59cf4a7d55 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
@@ -46,8 +46,11 @@ public class StandardDataValve implements DataValve {
     private final ProcessGroup processGroup;
     private final StateManager stateManager;
 
-    private Set<String> groupsWithDataFlowingIn = new HashSet<>();
-    private Set<String> groupsWithDataFlowingOut = new HashSet<>();
+    private final Set<String> groupsWithDataFlowingIn = new HashSet<>();
+    private final Set<String> groupsWithDataFlowingOut = new HashSet<>();
+
+    private boolean leftOpenDueToDataQueued = false;
+
 
     public StandardDataValve(final ProcessGroup processGroup, final 
StateManager stateManager) {
         this.processGroup = processGroup;
@@ -67,30 +70,46 @@ public class StandardDataValve implements DataValve {
             return true;
         }
 
-        final String reasonForNotAllowing = 
getReasonFlowIntoGroupNotAllowed(destinationGroup);
+        final FlowInForbiddenReason reasonForNotAllowing = 
getReasonFlowIntoGroupNotAllowed(destinationGroup);
+
+        // If we are forbidding data to flow into the group due to the fact 
that data is currently allowed to flow out of the group,
+        // and the valve was left open due to data being queued, let's verify 
that there is actually data queued up to flow out at the moment.
+        // If there is not, remove the group from those that are currently 
allowing data to flow out. This can happen in the following situation:
+        // - A FlowFile comes into the group
+        // - The FlowFile is split into two FlowFiles
+        // - One of the FlowFiles is routed to the Output Port, while the 
other is routed elsewhere
+        // - The Output Port is triggered. It opens the valve to allow data to 
flow out of the group.
+        // - The Output Port goes to close flow out of the group. However, the 
group is not empty, so the valve is not closed.
+        // - The other FlowFile is never routed to an output port. Instead, it 
is auto-terminated by some processor.
+        // Now, the valve has been left open.
+        // In this case, though, when the Output Port failed to close the 
valve, this.leftOpenDueToDataQueued was set to true. If that is the case,
+        // we can go ahead and close the valve now, if there's no more data 
queued.
+        if (reasonForNotAllowing == FlowInForbiddenReason.OPEN_FOR_OUTPUT && 
leftOpenDueToDataQueued && !destinationGroup.isDataQueued()) {
+            groupsWithDataFlowingOut.remove(destinationGroup.getIdentifier());
+        }
+
         if (reasonForNotAllowing != null) {
             // Since there is a reason not to allow it, return false. The 
reason has already been logged at a DEBUG level.
             return false;
         }
 
-
         logger.debug("Opening valve to allow data to flow into {}", 
destinationGroup);
         groupsWithDataFlowingIn.add(destinationGroup.getIdentifier());
         storeState();
         return true;
     }
 
-    private String getReasonFlowIntoGroupNotAllowed(final ProcessGroup 
destinationGroup) {
+    private FlowInForbiddenReason getReasonFlowIntoGroupNotAllowed(final 
ProcessGroup destinationGroup) {
         if (destinationGroup.isDataQueued()) {
             // If the destination group already has data queued up, and the 
valve is not already open, do not allow data to
             // flow into the group. If we did, we would end up mixing together 
two different batches of data.
             logger.trace("Will not allow data to flow into {} because valve is 
not already open and the Process Group has data queued", destinationGroup);
-            return "Process Group already has data queued and valve is not 
already allowing data into group";
+            return FlowInForbiddenReason.DATA_QUEUED;
         }
 
         if (destinationGroup.getFlowFileOutboundPolicy() == 
FlowFileOutboundPolicy.BATCH_OUTPUT && 
groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) {
             logger.trace("Will not allow data to flow into {} because Outbound 
Policy is Batch Output and valve is already open to allow data to flow out of 
group", destinationGroup);
-            return "Data Valve is already allowing data to flow out of group";
+            return FlowInForbiddenReason.OPEN_FOR_OUTPUT;
         }
 
         for (final Port port : destinationGroup.getInputPorts()) {
@@ -109,7 +128,7 @@ public class StandardDataValve implements DataValve {
                 if (Boolean.TRUE.equals(flowingOutOfSourceGroup)) {
                     logger.trace("Will not allow data to flow into {} because 
port {} has an incoming connection from {} and that Process Group is currently 
allowing data to flow out",
                         destinationGroup, port, sourceConnectable);
-                    return "Source connected to Input Port is an Output Port 
with Batch Output and is currently allowing data to flow out";
+                    return FlowInForbiddenReason.SOURCE_FLOWING_OUT;
                 }
             }
         }
@@ -161,6 +180,10 @@ public class StandardDataValve implements DataValve {
         logger.debug("Opening valve to allow data to flow out of {}", 
sourceGroup);
         groupsWithDataFlowingOut.add(sourceGroup.getIdentifier());
         storeState();
+
+        // Note that the valve has not been left open due to data being 
queued. This prevents an Input Port from closing the valve
+        // when the data is no longer queued, but while the Output Port is 
still processing the data.
+        leftOpenDueToDataQueued = false;
         return true;
     }
 
@@ -209,6 +232,9 @@ public class StandardDataValve implements DataValve {
         final boolean dataQueued = sourceGroup.isDataQueued();
         if (dataQueued) {
             logger.debug("Triggered to close flow of data out of group {} but 
group is not empty so will not close valve", sourceGroup);
+
+            // Denote that the valve was left open due to data being queued. 
This way, we can close the valve when the data is no longer queued.
+            leftOpenDueToDataQueued = true;
             return;
         }
 
@@ -231,10 +257,8 @@ public class StandardDataValve implements DataValve {
         final Map<String, List<ProcessGroup>> reasonOutputNotAllowed = new 
HashMap<>();
         for (final ProcessGroup group : processGroup.getProcessGroups()) {
             if (group.getFlowFileConcurrency() == 
FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
-                String inputReason = getReasonFlowIntoGroupNotAllowed(group);
-                if (inputReason == null) {
-                    inputReason = "Input is Allowed";
-                }
+                final FlowInForbiddenReason forbiddenReason = 
getReasonFlowIntoGroupNotAllowed(group);
+                final String inputReason = forbiddenReason == null ? "Input is 
Allowed" : forbiddenReason.getExplanation();
 
                 final List<ProcessGroup> inputGroupsAffected = 
reasonInputNotAllowed.computeIfAbsent(inputReason, k -> new ArrayList<>());
                 inputGroupsAffected.add(group);
@@ -333,4 +357,20 @@ public class StandardDataValve implements DataValve {
         return "StandardDataValve[group=" + processGroup + "]";
     }
 
+    public enum FlowInForbiddenReason {
+        DATA_QUEUED("Process Group already has data queued and valve is not 
already open to allow data to flow in"),
+
+        OPEN_FOR_OUTPUT("Data Valve is already open to allow data to flow out 
of group"),
+
+        SOURCE_FLOWING_OUT("Port has an incoming connection from a Process 
Group that is currently allowing data to flow out");
+
+        private final String explanation;
+        FlowInForbiddenReason(final String explanation) {
+            this.explanation = explanation;
+        }
+
+        public String getExplanation() {
+            return explanation;
+        }
+    }
 }

Reply via email to