Repository: nifi Updated Branches: refs/heads/master 81357d445 -> 721964b7d
NIFI-3036: When we replay a FlowFile, ensure that we are using the 'golden copy' of the associated Resource Claim, if the claim is still writable. Ensure that StandardResourceClaimManager retains the 'golden copy' of a Resource Claim until it is no longer writable and has a claim count of 0 This closes #1223 Signed-off-by: jpercivall <jperciv...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/721964b7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/721964b7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/721964b7 Branch: refs/heads/master Commit: 721964b7d87c696f293f7c769b274fc6364ff034 Parents: 81357d4 Author: Mark Payne <marka...@hotmail.com> Authored: Tue Nov 15 09:15:37 2016 -0500 Committer: jpercivall <jperciv...@apache.org> Committed: Tue Nov 15 16:51:34 2016 -0500 ---------------------------------------------------------------------- .../org/apache/nifi/controller/FlowController.java | 17 ++++++++++++++--- .../repository/FileSystemRepository.java | 10 +++++++++- .../claim/StandardResourceClaimManager.java | 11 ++++++++++- 3 files changed, 33 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/721964b7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 6927944..faa4230 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3795,9 +3795,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R throw new IllegalStateException("Cannot replay data from Provenance Event because the Source FlowFile Queue with ID " + event.getSourceQueueIdentifier() + " no longer exists"); } - // Create the ContentClaim - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), - event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false); + // Create the ContentClaim. To do so, we first need the appropriate Resource Claim. Because we don't know whether or + // not the Resource Claim is still active, we first call ResourceClaimManager.getResourceClaim. If this returns + // null, then we know that the Resource Claim is no longer active and can just create a new one that is not writable. + // It's critical though that we first call getResourceClaim because otherwise, if the Resource Claim is active and we + // create a new one that is not writable, we could end up archiving or destroying the Resource Claim while it's still + // being written to by the Content Repository. This is important only because we are creating a FlowFile with this Resource + // Claim. If, for instance, we are simply creating the claim to request its content, as in #getContentAvailability, etc. + // then this is not necessary. + ResourceClaim resourceClaim = resourceClaimManager.getResourceClaim(event.getPreviousContentClaimContainer(), + event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier()); + if (resourceClaim == null) { + resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), + event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false); + } // Increment Claimant Count, since we will now be referencing the Content Claim resourceClaimManager.incrementClaimantCount(resourceClaim); http://git-wip-us.apache.org/repos/asf/nifi/blob/721964b7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index e45852a..5eb9c5a 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -943,7 +943,15 @@ public class FileSystemRepository implements ContentRepository { final long resourceClaimLength = scc.getOffset() + scc.getLength(); if (recycle && resourceClaimLength < MAX_APPENDABLE_CLAIM_LENGTH) { final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength); - final boolean enqueued = writableClaimQueue.offer(pair); + + // We are checking that writableClaimStreams contains the resource claim as a key, as a sanity check. + // It should always be there. However, we have encountered a bug before where we archived content before + // we should have. As a result, the Resource Claim and the associated OutputStream were removed from the + // writableClaimStreams map, and this caused a NullPointerException. Worse, the call here to + // writableClaimQueue.offer() means that the ResourceClaim was then reused, which resulted in an endless + // loop of NullPointerException's being thrown. As a result, we simply ensure that the Resource Claim does + // in fact have an OutputStream associated with it before adding it back to the writableClaimQueue. + final boolean enqueued = writableClaimStreams.get(scc.getResourceClaim()) != null && writableClaimQueue.offer(pair); if (enqueued) { LOG.debug("Claim length less than max; Adding {} back to Writable Claim Queue", this); http://git-wip-us.apache.org/repos/asf/nifi/blob/721964b7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java index be0e8b8..7d554b1 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java @@ -97,7 +97,10 @@ public class StandardResourceClaimManager implements ResourceClaimManager { logger.debug("Decrementing claimant count for {} to {}", claim, newClaimantCount); } - if (newClaimantCount == 0) { + // If the claim is no longer referenced, we want to remove it. We consider the claim to be "no longer referenced" + // if the count is 0 and it is no longer writable (if it's writable, it may still be writable by the Content Repository, + // even though no existing FlowFile is referencing the claim). + if (newClaimantCount == 0 && !claim.isWritable()) { removeClaimantCount(claim); } return newClaimantCount; @@ -188,6 +191,12 @@ public class StandardResourceClaimManager implements ResourceClaimManager { } ((StandardResourceClaim) claim).freeze(); + + synchronized (claim) { + if (getClaimantCount(claim) == 0) { + claimantCounts.remove(claim); + } + } }