This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push: new 0eabbcdf19 NIFI-12228: This closes #7881. Fixed issue with FlowFile Concucrrency that can occasionally bring in more data than it should. Code cleanup, fixing logback to avoid INFO-level stack trace from xodus 0eabbcdf19 is described below commit 0eabbcdf19dfa9de7085b4b173ac3d3260f9d3a4 Author: Mark Payne <marka...@hotmail.com> AuthorDate: Fri Oct 13 11:03:11 2023 -0400 NIFI-12228: This closes #7881. Fixed issue with FlowFile Concucrrency that can occasionally bring in more data than it should. Code cleanup, fixing logback to avoid INFO-level stack trace from xodus Signed-off-by: Joseph Witt <joew...@apache.org> --- .../org/apache/nifi/connectable/LocalPort.java | 12 +++------- .../nifi/groups/SingleConcurrencyFlowFileGate.java | 17 ++++++++++++++ .../org/apache/nifi/groups/StandardDataValve.java | 27 ++++++++++++++-------- .../resources/conf/clustered/node1/logback.xml | 3 +++ .../resources/conf/clustered/node2/logback.xml | 3 +++ .../src/test/resources/conf/default/logback.xml | 4 +++- .../src/test/resources/conf/pythonic/logback.xml | 3 +++ 7 files changed, 49 insertions(+), 20 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java index 86366f6eb8..53ffc01937 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/connectable/LocalPort.java @@ -186,15 +186,9 @@ public class LocalPort extends AbstractPort { final FlowFileConcurrency flowFileConcurrency = getProcessGroup().getFlowFileConcurrency(); switch (flowFileConcurrency) { - case UNBOUNDED: - transferUnboundedConcurrency(context, session); - break; - case SINGLE_FLOWFILE_PER_NODE: - transferSingleFlowFile(session); - break; - case SINGLE_BATCH_PER_NODE: - transferInputBatch(session); - break; + case UNBOUNDED -> transferUnboundedConcurrency(context, session); + case SINGLE_FLOWFILE_PER_NODE -> transferSingleFlowFile(session); + case SINGLE_BATCH_PER_NODE -> transferInputBatch(session); } } finally { flowFileGate.releaseClaim(this); diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java index 2282636b57..c006cdb659 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/SingleConcurrencyFlowFileGate.java @@ -36,6 +36,16 @@ public class SingleConcurrencyFlowFileGate implements FlowFileGate { return false; } + // We need to try to open flow into the Port's group. To do this, we need to get the data valve for the parent group, + // as it is responsible for data flowing into and out of its children. + final ProcessGroup dataValveGroup = port.getProcessGroup().getParent(); + final DataValve dataValve = dataValveGroup.getDataValve(); + final boolean openFlowIntoGroup = dataValve.tryOpenFlowIntoGroup(port.getProcessGroup()); + if (!openFlowIntoGroup) { + claimed.set(false); + return false; + } + // The claim is now held by this thread. Check if the ProcessGroup is empty. final boolean empty = !port.getProcessGroup().isDataQueued(); if (empty) { @@ -43,6 +53,9 @@ public class SingleConcurrencyFlowFileGate implements FlowFileGate { return true; } + // We have already opened flow into group, so now we must close it, since we are not allowing flow in + dataValve.closeFlowIntoGroup(port.getProcessGroup()); + // Process Group was not empty, so we cannot allow any more FlowFiles through. Reset claimed to false and return false, // indicating that the caller did not obtain the claim. claimed.set(false); @@ -52,5 +65,9 @@ public class SingleConcurrencyFlowFileGate implements FlowFileGate { @Override public void releaseClaim(final Port port) { claimed.set(false); + + final ProcessGroup dataValveGroup = port.getProcessGroup().getParent(); + final DataValve dataValve = dataValveGroup.getDataValve(); + dataValve.closeFlowIntoGroup(port.getProcessGroup()); } } diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java index f7d647b0c5..acc9e0ad43 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardDataValve.java @@ -84,10 +84,15 @@ public class StandardDataValve implements DataValve { if (destinationGroup.isDataQueued()) { // If the destination group already has data queued up, and the valve is not already open, do not allow data to // flow into the group. If we did, we would end up mixing together two different batches of data. - logger.debug("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup); + logger.trace("Will not allow data to flow into {} because valve is not already open and the Process Group has data queued", destinationGroup); return "Process Group already has data queued and valve is not already allowing data into group"; } + if (destinationGroup.getFlowFileOutboundPolicy() == FlowFileOutboundPolicy.BATCH_OUTPUT && groupsWithDataFlowingOut.contains(destinationGroup.getIdentifier())) { + logger.trace("Will not allow data to flow into {} because Outbound Policy is Batch Output and valve is already open to allow data to flow out of group", destinationGroup); + return "Data Valve is already allowing data to flow out of group"; + } + for (final Port port : destinationGroup.getInputPorts()) { for (final Connection connection : port.getIncomingConnections()) { final Connectable sourceConnectable = connection.getSource(); @@ -102,7 +107,7 @@ public class StandardDataValve implements DataValve { final boolean flowingOutOfSourceGroup = groupsWithDataFlowingOut.contains(sourceGroup.getIdentifier()); if (Boolean.TRUE.equals(flowingOutOfSourceGroup)) { - logger.debug("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out", + logger.trace("Will not allow data to flow into {} because port {} has an incoming connection from {} and that Process Group is currently allowing data to flow out", destinationGroup, port, sourceConnectable); return "Source connected to Input Port is an Output Port with Batch Output and is currently allowing data to flow out"; } @@ -119,13 +124,15 @@ public class StandardDataValve implements DataValve { return; } - for (final Port port : destinationGroup.getInputPorts()) { - for (final Connection connection : port.getIncomingConnections()) { - if (!connection.getFlowFileQueue().isEmpty()) { - logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve", - destinationGroup, connection); + if (destinationGroup.getFlowFileConcurrency() == FlowFileConcurrency.SINGLE_BATCH_PER_NODE) { + for (final Port port : destinationGroup.getInputPorts()) { + for (final Connection connection : port.getIncomingConnections()) { + if (!connection.getFlowFileQueue().isEmpty()) { + logger.debug("Triggered to close flow of data into group {} but Input Port has incoming Connection {}, which is not empty, so will not close valve", + destinationGroup, connection); - return; + return; + } } } } @@ -175,14 +182,14 @@ public class StandardDataValve implements DataValve { } if (!connection.getFlowFileQueue().isEmpty()) { - logger.debug("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is " + logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, which has data queued and its Process Group is " + "configured with a FlowFileConcurrency of Batch Per Node.", sourceGroup, port, connection); return "Output Connection already has data queued"; } final boolean dataFlowingIntoDestination = groupsWithDataFlowingIn.contains(destinationProcessGroup.getIdentifier()); if (dataFlowingIntoDestination) { - logger.debug("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is " + logger.trace("Not allowing data to flow out of {} because {} has a destination of {}, and its Process Group is " + "currently allowing data to flow in", sourceGroup, port, connection); return "Destination Process Group is allowing data to flow in"; } diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml index b5927f48cf..e29ca67b3a 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/logback.xml @@ -148,6 +148,9 @@ <!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default --> <logger name="org.apache.atlas" level="WARN"/> + <!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel --> + <logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" /> + <!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE --> <logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false"> <appender-ref ref="APP_FILE"/> diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml index b5927f48cf..e29ca67b3a 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/logback.xml @@ -148,6 +148,9 @@ <!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default --> <logger name="org.apache.atlas" level="WARN"/> + <!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel --> + <logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" /> + <!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE --> <logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false"> <appender-ref ref="APP_FILE"/> diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml index 4c9d7fa1a6..aa8e72d452 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/logback.xml @@ -102,7 +102,6 @@ <logger name="org.apache.nifi.processors.standard.LogMessage" level="INFO"/> <logger name="org.apache.nifi.controller.repository.StandardProcessSession" level="WARN" /> - <logger name="org.apache.zookeeper.ClientCnxn" level="ERROR" /> <logger name="org.apache.zookeeper.server.NIOServerCnxn" level="ERROR" /> <logger name="org.apache.zookeeper.server.NIOServerCnxnFactory" level="ERROR" /> @@ -149,6 +148,9 @@ <!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default --> <logger name="org.apache.atlas" level="WARN"/> + <!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel --> + <logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" /> + <!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE --> <logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false"> <appender-ref ref="APP_FILE"/> diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/logback.xml b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/logback.xml index 1a62d56362..806958ddbf 100644 --- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/logback.xml +++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/pythonic/logback.xml @@ -149,6 +149,9 @@ <!-- Suppress non-error messages from Apache Atlas which was emitting large amounts of INFO logs by default --> <logger name="org.apache.atlas" level="WARN"/> + <!-- Suppress non-error messages from JetBrains Xodus FileDataWriter related to FileChannel --> + <logger name="jetbrains.exodus.io.FileDataWriter" level="WARN" /> + <!-- These log messages would normally go to the USER_FILE log, but they belong in the APP_FILE --> <logger name="org.apache.nifi.web.security.requests" level="INFO" additivity="false"> <appender-ref ref="APP_FILE"/>