Repository: nifi
Updated Branches:
  refs/heads/master 81357d445 -> 721964b7d


NIFI-3036: When we replay a FlowFile, ensure that we are using the 'golden 
copy' of the associated Resource Claim, if the claim is still writable. Ensure 
that StandardResourceClaimManager retains the 'golden copy' of a Resource Claim 
until it is no longer writable and has a claim count of 0

This closes #1223

Signed-off-by: jpercivall <jperciv...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/721964b7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/721964b7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/721964b7

Branch: refs/heads/master
Commit: 721964b7d87c696f293f7c769b274fc6364ff034
Parents: 81357d4
Author: Mark Payne <marka...@hotmail.com>
Authored: Tue Nov 15 09:15:37 2016 -0500
Committer: jpercivall <jperciv...@apache.org>
Committed: Tue Nov 15 16:51:34 2016 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/controller/FlowController.java | 17 ++++++++++++++---
 .../repository/FileSystemRepository.java           | 10 +++++++++-
 .../claim/StandardResourceClaimManager.java        | 11 ++++++++++-
 3 files changed, 33 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/721964b7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 6927944..faa4230 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -3795,9 +3795,20 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
             throw new IllegalStateException("Cannot replay data from 
Provenance Event because the Source FlowFile Queue with ID " + 
event.getSourceQueueIdentifier() + " no longer exists");
         }
 
-        // Create the ContentClaim
-        final ResourceClaim resourceClaim = 
resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
-            event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier(), false, false);
+        // Create the ContentClaim. To do so, we first need the appropriate 
Resource Claim. Because we don't know whether or
+        // not the Resource Claim is still active, we first call 
ResourceClaimManager.getResourceClaim. If this returns
+        // null, then we know that the Resource Claim is no longer active and 
can just create a new one that is not writable.
+        // It's critical though that we first call getResourceClaim because 
otherwise, if the Resource Claim is active and we
+        // create a new one that is not writable, we could end up archiving or 
destroying the Resource Claim while it's still
+        // being written to by the Content Repository. This is important only 
because we are creating a FlowFile with this Resource
+        // Claim. If, for instance, we are simply creating the claim to 
request its content, as in #getContentAvailability, etc.
+        // then this is not necessary.
+        ResourceClaim resourceClaim = 
resourceClaimManager.getResourceClaim(event.getPreviousContentClaimContainer(),
+            event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier());
+        if (resourceClaim == null) {
+            resourceClaim = 
resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(),
+                event.getPreviousContentClaimSection(), 
event.getPreviousContentClaimIdentifier(), false, false);
+        }
 
         // Increment Claimant Count, since we will now be referencing the 
Content Claim
         resourceClaimManager.incrementClaimantCount(resourceClaim);

http://git-wip-us.apache.org/repos/asf/nifi/blob/721964b7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index e45852a..5eb9c5a 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -943,7 +943,15 @@ public class FileSystemRepository implements 
ContentRepository {
                 final long resourceClaimLength = scc.getOffset() + 
scc.getLength();
                 if (recycle && resourceClaimLength < 
MAX_APPENDABLE_CLAIM_LENGTH) {
                     final ClaimLengthPair pair = new 
ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
-                    final boolean enqueued = writableClaimQueue.offer(pair);
+
+                    // We are checking that writableClaimStreams contains the 
resource claim as a key, as a sanity check.
+                    // It should always be there. However, we have encountered 
a bug before where we archived content before
+                    // we should have. As a result, the Resource Claim and the 
associated OutputStream were removed from the
+                    // writableClaimStreams map, and this caused a 
NullPointerException. Worse, the call here to
+                    // writableClaimQueue.offer() means that the ResourceClaim 
was then reused, which resulted in an endless
+                    // loop of NullPointerException's being thrown. As a 
result, we simply ensure that the Resource Claim does
+                    // in fact have an OutputStream associated with it before 
adding it back to the writableClaimQueue.
+                    final boolean enqueued = 
writableClaimStreams.get(scc.getResourceClaim()) != null && 
writableClaimQueue.offer(pair);
 
                     if (enqueued) {
                         LOG.debug("Claim length less than max; Adding {} back 
to Writable Claim Queue", this);

http://git-wip-us.apache.org/repos/asf/nifi/blob/721964b7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index be0e8b8..7d554b1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -97,7 +97,10 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
                 logger.debug("Decrementing claimant count for {} to {}", 
claim, newClaimantCount);
             }
 
-            if (newClaimantCount == 0) {
+            // If the claim is no longer referenced, we want to remove it. We 
consider the claim to be "no longer referenced"
+            // if the count is 0 and it is no longer writable (if it's 
writable, it may still be writable by the Content Repository,
+            // even though no existing FlowFile is referencing the claim).
+            if (newClaimantCount == 0 && !claim.isWritable()) {
                 removeClaimantCount(claim);
             }
             return newClaimantCount;
@@ -188,6 +191,12 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
         }
 
         ((StandardResourceClaim) claim).freeze();
+
+        synchronized (claim) {
+            if (getClaimantCount(claim) == 0) {
+                claimantCounts.remove(claim);
+            }
+        }
     }
 
 

Reply via email to