NIFI-41: Don't allow destination fo connection to change if current destination is holding its FlowFiles
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8d1536ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8d1536ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8d1536ed Branch: refs/heads/develop Commit: 8d1536ed24ea0b149f9fa14ead073b8decdead7d Parents: 66f3b7e Author: Mark Payne <marka...@hotmail.com> Authored: Fri Jun 5 08:49:01 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Fri Jun 5 08:49:01 2015 -0400 ---------------------------------------------------------------------- .../java/org/apache/nifi/controller/FlowFileQueue.java | 9 +++++++++ .../apache/nifi/controller/StandardFlowFileQueue.java | 13 +++++++++---- .../apache/nifi/connectable/StandardConnection.java | 4 ++++ 3 files changed, 22 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8d1536ed/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java index 92a4ee0..e1baeb7 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java @@ -100,6 +100,15 @@ public interface FlowFileQueue { QueueSize getActiveQueueSize(); + /** + * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile + * is considered to be unacknowledged if it has been pulled from the queue by some component + * but the session that pulled the FlowFile has not yet been committed or rolled back. + * + * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'. + */ + QueueSize getUnacknowledgedQueueSize(); + void acknowledge(FlowFileRecord flowFile); void acknowledge(Collection<FlowFileRecord> flowFiles); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8d1536ed/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 075f2cf..8f6c8ed 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -229,7 +229,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue { } return new QueueSize(activeQueue.size() + swappedRecordCount + unacknowledged.getObjectCount() + preFetchCount, - activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); + activeQueueContentSize + swappedContentSize + unacknowledged.getByteCount() + preFetchSize); } @Override @@ -526,9 +526,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue { final QueueSize unacknowledged = unacknowledgedSizeRef.get(); logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap Queue={}/{} MB, Unacknowledged={}/{} MB", - activeQueue.size(), activeQueueContentSize / byteToMbDivisor, - swappedRecordCount, swappedContentSize / byteToMbDivisor, - unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); + activeQueue.size(), activeQueueContentSize / byteToMbDivisor, + swappedRecordCount, swappedContentSize / byteToMbDivisor, + unacknowledged.getObjectCount(), unacknowledged.getByteCount() / byteToMbDivisor); } return swapQueue.size(); @@ -961,6 +961,11 @@ public final class StandardFlowFileQueue implements FlowFileQueue { writeLock.unlock("external unlock"); } + @Override + public QueueSize getUnacknowledgedQueueSize() { + return unacknowledgedSizeRef.get(); + } + private void updateUnacknowledgedSize(final int addToCount, final long addToSize) { boolean updated = false; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8d1536ed/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 86c9320..ad556e2 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -181,6 +181,10 @@ public final class StandardConnection implements Connection { throw new IllegalStateException("Cannot change destination of Connection because the current destination is running"); } + if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 0) { + throw new IllegalStateException("Cannot change destination of Connection because FlowFiles from this Connection are currently held by " + previousDestination); + } + try { previousDestination.removeConnection(this); this.destination.set(newDestination);