This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0eabbcdf19 NIFI-12228: This closes #7881. Fixed issue with FlowFile 
Concucrrency that can occasionally bring in more data than it should. Code 
cleanup, fixing logback to avoid INFO-level stack trace from xodus
0eabbcdf19 is described below

commit 0eabbcdf19dfa9de7085b4b173ac3d3260f9d3a4
Author: Mark Payne <marka...@hotmail.com>
AuthorDate: Fri Oct 13 11:03:11 2023 -0400

    NIFI-12228: This closes #7881. Fixed issue with FlowFile Concucrrency that 
can occasionally bring in more data than it should.
    Code cleanup, fixing logback to avoid INFO-level stack trace from xodus
    
    Signed-off-by: Joseph Witt <joew...@apache.org>
---
 .../org/apache/nifi/connectable/LocalPort.java     | 12 +++-------
 .../nifi/groups/SingleConcurrencyFlowFileGate.java | 17 ++++++++++++++
 .../org/apache/nifi/groups/StandardDataValve.java  | 27 ++++++++++++++--------
 .../resources/conf/clustered/node1/logback.xml     |  3 +++
 .../resources/conf/clustered/node2/logback.xml     |  3 +++
 .../src/test/resources/conf/default/logback.xml    |  4 +++-
 .../src/test/resources/conf/pythonic/logback.xml   |  3 +++
 7 files changed, 49 insertions(+), 20 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java
index 86366f6eb8..53ffc01937 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java
@@ -186,15 +186,9 @@ public class LocalPort extends AbstractPort {
 
             final FlowFileConcurrency flowFileConcurrency = 
getProcessGroup().getFlowFileConcurrency();
             switch (flowFileConcurrency) {
-                case UNBOUNDED:
-                    transferUnboundedConcurrency(context, session);
-                    break;
-                case SINGLE_FLOWFILE_PER_NODE:
-                    transferSingleFlowFile(session);
-                    break;
-                case SINGLE_BATCH_PER_NODE:
-                    transferInputBatch(session);
-                    break;
+                case UNBOUNDED -> transferUnboundedConcurrency(context, 
session);
+                case SINGLE_FLOWFILE_PER_NODE -> 
transferSingleFlowFile(session);
+                case SINGLE_BATCH_PER_NODE -> transferInputBatch(session);
             }
         } finally {
             flowFileGate.releaseClaim(this);
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
index 2282636b57..c006cdb659 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java
@@ -36,6 +36,16 @@ public class SingleConcurrencyFlowFileGate implements 
FlowFileGate {
             return false;
         }
 
+        // We need to try to open flow into the Port's group. To do this, we 
need to get the data valve for the parent group,
+        // as it is responsible for data flowing into and out of its children.
+        final ProcessGroup dataValveGroup = port.getProcessGroup().getParent();
+        final DataValve dataValve = dataValveGroup.getDataValve();
+        final boolean openFlowIntoGroup = 
dataValve.tryOpenFlowIntoGroup(port.getProcessGroup());
+        if (!openFlowIntoGroup) {
+            claimed.set(false);
+            return false;
+        }
+
         // The claim is now held by this thread. Check if the ProcessGroup is 
empty.
         final boolean empty = !port.getProcessGroup().isDataQueued();
         if (empty) {
@@ -43,6 +53,9 @@ public class SingleConcurrencyFlowFileGate implements 
FlowFileGate {
             return true;
         }
 
+        // We have already opened flow into group, so now we must close it, 
since we are not allowing flow in
+        dataValve.closeFlowIntoGroup(port.getProcessGroup());
+
         // Process Group was not empty, so we cannot allow any more FlowFiles 
through. Reset claimed to false and return false,
         // indicating that the caller did not obtain the claim.
         claimed.set(false);
@@ -52,5 +65,9 @@ public class SingleConcurrencyFlowFileGate implements 
FlowFileGate {
     @Override
     public void releaseClaim(final Port port) {
         claimed.set(false);
+
+        final ProcessGroup dataValveGroup = port.getProcessGroup().getParent();
+        final DataValve dataValve = dataValveGroup.getDataValve();
+        dataValve.closeFlowIntoGroup(port.getProcessGroup());
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
index f7d647b0c5..acc9e0ad43 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java
@@ -84,10 +84,15 @@ public class StandardDataValve implements DataValve {
         if (destinationGroup.isDataQueued()) {
             // If the destination group already has data queued up, and the 
valve is not already open, do not allow data to
             // flow into the group. If we did, we would end up mixing together 
two different batches of data.
-            logger.debug("Will not allow data to flow into {} because valve is 
not already open and the Process Group has data queued", destinationGroup);
+            logger.trace("Will not allow data to flow into {} because valve is 
not already open and the Process Group has data queued", destinationGroup);
             return "Process Group already has data queued and valve is not 
already allowing data into group";
         }
 
+        if (destinationGroup.getFlowFileOutboundPolicy() == 
FlowFileOutboundPolicy.BATCH_OUTPUT && 
groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) {
+            logger.trace("Will not allow data to flow into {} because Outbound 
Policy is Batch Output and valve is already open to allow data to flow out of 
group", destinationGroup);
+            return "Data Valve is already allowing data to flow out of group";
+        }
+
         for (final Port port : destinationGroup.getInputPorts()) {
             for (final Connection connection : port.getIncomingConnections()) {
                 final Connectable sourceConnectable = connection.getSource();
@@ -102,7 +107,7 @@ public class StandardDataValve implements DataValve {
 
                 final boolean flowingOutOfSourceGroup = 
groupsWithDataFlowingOut.contains(sourceGroup.getIdentifier());
                 if (Boolean.TRUE.equals(flowingOutOfSourceGroup)) {
-                    logger.debug("Will not allow data to flow into {} because 
port {} has an incoming connection from {} and that Process Group is currently 
allowing data to flow out",
+                    logger.trace("Will not allow data to flow into {} because 
port {} has an incoming connection from {} and that Process Group is currently 
allowing data to flow out",
                         destinationGroup, port, sourceConnectable);
                     return "Source connected to Input Port is an Output Port 
with Batch Output and is currently allowing data to flow out";
                 }
@@ -119,13 +124,15 @@ public class StandardDataValve implements DataValve {
             return;
         }
 
-        for (final Port port : destinationGroup.getInputPorts()) {
-            for (final Connection connection : port.getIncomingConnections()) {
-                if (!connection.getFlowFileQueue().isEmpty()) {
-                    logger.debug("Triggered to close flow of data into group 
{} but Input Port has incoming Connection {}, which is not empty, so will not 
close valve",
-                        destinationGroup, connection);
+        if (destinationGroup.getFlowFileConcurrency() == 
FlowFileConcurrency.SINGLE_BATCH_PER_NODE) {
+            for (final Port port : destinationGroup.getInputPorts()) {
+                for (final Connection connection : 
port.getIncomingConnections()) {
+                    if (!connection.getFlowFileQueue().isEmpty()) {
+                        logger.debug("Triggered to close flow of data into 
group {} but Input Port has incoming Connection {}, which is not empty, so will 
not close valve",
+                            destinationGroup, connection);
 
-                    return;
+                        return;
+                    }
                 }
             }
         }
@@ -175,14 +182,14 @@ public class StandardDataValve implements DataValve {
                 }
 
                 if (!connection.getFlowFileQueue().isEmpty()) {
-                    logger.debug("Not allowing data to flow out of {} because 
{} has a destination of {}, which has data queued and its Process Group is "
+                    logger.trace("Not allowing data to flow out of {} because 
{} has a destination of {}, which has data queued and its Process Group is "
                         + "configured with a FlowFileConcurrency of Batch Per 
Node.", sourceGroup, port, connection);
                     return "Output Connection already has data queued";
                 }
 
                 final boolean dataFlowingIntoDestination = 
groupsWithDataFlowingIn.contains(destinationProcessGroup.getIdentifier());
                 if (dataFlowingIntoDestination) {
-                    logger.debug("Not allowing data to flow out of {} because 
{} has a destination of {}, and its Process Group is "
+                    logger.trace("Not allowing data to flow out of {} because 
{} has a destination of {}, and its Process Group is "
                         + "currently allowing data to flow in", sourceGroup, 
port, connection);
                     return "Destination Process Group is allowing data to flow 
in";
                 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml
index b5927f48cf..e29ca67b3a 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml
@@ -148,6 +148,9 @@
     <!-- Suppress non-error messages from Apache Atlas which was emitting 
large amounts of INFO logs by default -->
     <logger name="org.apache.atlas" level="WARN"/>
 
+    <!-- Suppress non-error messages from JetBrains Xodus FileDataWriter 
related to FileChannel -->
+    <logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />
+
     <!-- These log messages would normally go to the USER_FILE log, but they 
belong in the APP_FILE -->
     <logger name="org.apache.nifi.web.security.requests" level="INFO" 
additivity="false">
         <appender-ref ref="APP_FILE"/>
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml
index b5927f48cf..e29ca67b3a 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml
@@ -148,6 +148,9 @@
     <!-- Suppress non-error messages from Apache Atlas which was emitting 
large amounts of INFO logs by default -->
     <logger name="org.apache.atlas" level="WARN"/>
 
+    <!-- Suppress non-error messages from JetBrains Xodus FileDataWriter 
related to FileChannel -->
+    <logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />
+
     <!-- These log messages would normally go to the USER_FILE log, but they 
belong in the APP_FILE -->
     <logger name="org.apache.nifi.web.security.requests" level="INFO" 
additivity="false">
         <appender-ref ref="APP_FILE"/>
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
index 4c9d7fa1a6..aa8e72d452 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml
@@ -102,7 +102,6 @@
     <logger name="org.apache.nifi.processors.standard.LogMessage" 
level="INFO"/>
     <logger 
name="org.apache.nifi.controller.repository.StandardProcessSession" 
level="WARN" />
 
-
     <logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" />
     <logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" />
     <logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" 
level="ERROR" />
@@ -149,6 +148,9 @@
     <!-- Suppress non-error messages from Apache Atlas which was emitting 
large amounts of INFO logs by default -->
     <logger name="org.apache.atlas" level="WARN"/>
 
+    <!-- Suppress non-error messages from JetBrains Xodus FileDataWriter 
related to FileChannel -->
+    <logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />
+
     <!-- These log messages would normally go to the USER_FILE log, but they 
belong in the APP_FILE -->
     <logger name="org.apache.nifi.web.security.requests" level="INFO" 
additivity="false">
         <appender-ref ref="APP_FILE"/>
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/logback.xml
 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/logback.xml
index 1a62d56362..806958ddbf 100644
--- 
a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/logback.xml
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/logback.xml
@@ -149,6 +149,9 @@
     <!-- Suppress non-error messages from Apache Atlas which was emitting 
large amounts of INFO logs by default -->
     <logger name="org.apache.atlas" level="WARN"/>
 
+    <!-- Suppress non-error messages from JetBrains Xodus FileDataWriter 
related to FileChannel -->
+    <logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" />
+
     <!-- These log messages would normally go to the USER_FILE log, but they 
belong in the APP_FILE -->
     <logger name="org.apache.nifi.web.security.requests" level="INFO" 
additivity="false">
         <appender-ref ref="APP_FILE"/>

Reply via email to