NIFI-41: Don't allow destination fo connection to change if current destination 
is holding its FlowFiles


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8d1536ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8d1536ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8d1536ed

Branch: refs/heads/develop
Commit: 8d1536ed24ea0b149f9fa14ead073b8decdead7d
Parents: 66f3b7e
Author: Mark Payne <marka...@hotmail.com>
Authored: Fri Jun 5 08:49:01 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Fri Jun 5 08:49:01 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/controller/FlowFileQueue.java |  9 +++++++++
 .../apache/nifi/controller/StandardFlowFileQueue.java  | 13 +++++++++----
 .../apache/nifi/connectable/StandardConnection.java    |  4 ++++
 3 files changed, 22 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8d1536ed/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java 
b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
index 92a4ee0..e1baeb7 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java
@@ -100,6 +100,15 @@ public interface FlowFileQueue {
 
     QueueSize getActiveQueueSize();
 
+    /**
+     * Returns a QueueSize that represents all FlowFiles that are 
'unacknowledged'. A FlowFile
+     * is considered to be unacknowledged if it has been pulled from the queue 
by some component
+     * but the session that pulled the FlowFile has not yet been committed or 
rolled back.
+     *
+     * @return a QueueSize that represents all FlowFiles that are 
'unacknowledged'.
+     */
+    QueueSize getUnacknowledgedQueueSize();
+
     void acknowledge(FlowFileRecord flowFile);
 
     void acknowledge(Collection<FlowFileRecord> flowFiles);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8d1536ed/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 075f2cf..8f6c8ed 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -229,7 +229,7 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         }
 
         return new QueueSize(activeQueue.size() + swappedRecordCount + 
unacknowledged.getObjectCount() + preFetchCount,
-                activeQueueContentSize + swappedContentSize + 
unacknowledged.getByteCount() + preFetchSize);
+            activeQueueContentSize + swappedContentSize + 
unacknowledged.getByteCount() + preFetchSize);
     }
 
     @Override
@@ -526,9 +526,9 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
                 final QueueSize unacknowledged = unacknowledgedSizeRef.get();
 
                 logger.debug("Total Queue Size: ActiveQueue={}/{} MB, Swap 
Queue={}/{} MB, Unacknowledged={}/{} MB",
-                        activeQueue.size(), activeQueueContentSize / 
byteToMbDivisor,
-                        swappedRecordCount, swappedContentSize / 
byteToMbDivisor,
-                        unacknowledged.getObjectCount(), 
unacknowledged.getByteCount() / byteToMbDivisor);
+                    activeQueue.size(), activeQueueContentSize / 
byteToMbDivisor,
+                    swappedRecordCount, swappedContentSize / byteToMbDivisor,
+                    unacknowledged.getObjectCount(), 
unacknowledged.getByteCount() / byteToMbDivisor);
             }
 
             return swapQueue.size();
@@ -961,6 +961,11 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
         writeLock.unlock("external unlock");
     }
 
+    @Override
+    public QueueSize getUnacknowledgedQueueSize() {
+        return unacknowledgedSizeRef.get();
+    }
+
     private void updateUnacknowledgedSize(final int addToCount, final long 
addToSize) {
         boolean updated = false;
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8d1536ed/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 86c9320..ad556e2 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -181,6 +181,10 @@ public final class StandardConnection implements 
Connection {
             throw new IllegalStateException("Cannot change destination of 
Connection because the current destination is running");
         }
 
+        if (getFlowFileQueue().getUnacknowledgedQueueSize().getObjectCount() > 
0) {
+            throw new IllegalStateException("Cannot change destination of 
Connection because FlowFiles from this Connection are currently held by " + 
previousDestination);
+        }
+
         try {
             previousDestination.removeConnection(this);
             this.destination.set(newDestination);

Reply via email to