NIFI-2920 This is a 0.x version of Mark Payne's patch for NIFI-2925
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a9395bc6 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a9395bc6 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a9395bc6 Branch: refs/heads/support/nifi-0.7.x Commit: a9395bc67c2de4fa6ba82ca0573ad210ccc67434 Parents: 05d5fba Author: Bryan Bende <bbe...@apache.org> Authored: Thu Oct 27 14:31:53 2016 -0400 Committer: Bryan Bende <bbe...@apache.org> Committed: Mon Oct 31 10:02:51 2016 -0400 ---------------------------------------------------------------------- .../repository/claim/ResourceClaimManager.java | 14 ++++- .../nifi/controller/FileSystemSwapManager.java | 9 +++- .../apache/nifi/controller/FlowController.java | 11 ++-- .../repository/FileSystemRepository.java | 5 +- .../repository/VolatileContentRepository.java | 2 +- .../WriteAheadFlowFileRepository.java | 2 +- .../claim/StandardResourceClaimManager.java | 54 +++++++++++++++----- .../controller/TestFileSystemSwapManager.java | 7 ++- .../repository/TestStandardProcessSession.java | 20 ++++---- .../TestVolatileContentRepository.java | 2 +- .../TestWriteAheadFlowFileRepository.java | 4 +- .../claim/TestStandardResourceClaimManager.java | 2 +- 12 files changed, 94 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java index b430df0..68643f9 100644 --- a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java @@ -32,9 +32,21 @@ public interface ResourceClaimManager { * @param container of claim * @param section of claim * @param lossTolerant of claim + * @param writable whether or not the claim should be made writable * @return new claim */ - ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant); + ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable); + + /** + * Returns the Resource Claim with the given id, container, and section, if one exists, <code>null</code> otherwise + * + * @param id of claim + * @param container of claim + * @param section of claim + * @return the existing resource claim or <code>null</code> if none exists + */ + ResourceClaim getResourceClaim(String container, String section, String id); + /** * @param claim to obtain reference count for http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index 156389b..15a90cb 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -492,7 +492,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager { lossTolerant = false; } - resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant); + resourceClaim = claimManager.getResourceClaim(container, section, claimId); + if (resourceClaim == null) { + logger.error("Swap file indicates that FlowFile was referencing Resource Claim at container={}, section={}, claimId={}, " + + "but this Resource Claim cannot be found! Will create a temporary Resource Claim, but this may affect the framework's " + + "ability to properly clean up this resource", container, section, claimId); + resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, true); + } + final StandardContentClaim claim = new StandardContentClaim(resourceClaim, resourceOffset); claim.setLength(resourceLength); http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 3eff44c..0895006 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -3349,7 +3349,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R return null; } - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(container, section, identifier, false, false); return new StandardContentClaim(resourceClaim, offset == null ? 0L : offset.longValue()); } @@ -3375,7 +3375,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getPreviousContentClaimContainer(), provEvent.getPreviousContentClaimSection(), - provEvent.getPreviousContentClaimIdentifier(), false); + provEvent.getPreviousContentClaimIdentifier(), false, false); + claim = new StandardContentClaim(resourceClaim, provEvent.getPreviousContentClaimOffset()); offset = provEvent.getPreviousContentClaimOffset() == null ? 0L : provEvent.getPreviousContentClaimOffset(); size = provEvent.getPreviousFileSize(); @@ -3385,7 +3386,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(provEvent.getContentClaimContainer(), provEvent.getContentClaimSection(), - provEvent.getContentClaimIdentifier(), false); + provEvent.getContentClaimIdentifier(), false, false); claim = new StandardContentClaim(resourceClaim, provEvent.getContentClaimOffset()); offset = provEvent.getContentClaimOffset() == null ? 0L : provEvent.getContentClaimOffset(); @@ -3478,7 +3479,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R } try { - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(contentClaimContainer, contentClaimSection, contentClaimId, false, false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, event.getPreviousContentClaimOffset()); if (!contentRepository.isAccessible(contentClaim)) { @@ -3559,7 +3560,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R // Create the ContentClaim final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(event.getPreviousContentClaimContainer(), - event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false); + event.getPreviousContentClaimSection(), event.getPreviousContentClaimIdentifier(), false, false); // Increment Claimant Count, since we will now be referencing the Content Claim resourceClaimManager.incrementClaimantCount(resourceClaim); http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 8a3ac6d..d94869f 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -426,7 +426,7 @@ public class FileSystemRepository implements ContentRepository { final String id = idPath.toFile().getName(); final String sectionName = sectionPath.toFile().getName(); - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false, false); if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) { removeIncompleteContent(fileToRemove); } @@ -524,7 +524,7 @@ public class FileSystemRepository implements ContentRepository { final String section = String.valueOf(modulatedSectionIndex); final String claimId = System.currentTimeMillis() + "-" + currentIndex; - resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant); + resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant, true); resourceOffset = 0L; LOG.debug("Creating new Resource Claim {}", resourceClaim); @@ -939,6 +939,7 @@ public class FileSystemRepository implements ContentRepository { LOG.debug("Claim length less than max; Adding {} back to Writable Claim Queue", this); } else { writableClaimStreams.remove(scc.getResourceClaim()); + resourceClaimManager.freeze(scc.getResourceClaim()); bcos.close(); http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index 08b7e80..1f75320 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -203,7 +203,7 @@ public class VolatileContentRepository implements ContentRepository { private ContentClaim createLossTolerant() { final long id = idGenerator.getAndIncrement(); - final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true); + final ResourceClaim resourceClaim = claimManager.newResourceClaim(CONTAINER_NAME, "section", String.valueOf(id), true, false); final ContentClaim claim = new StandardContentClaim(resourceClaim, 0L); final ContentBlock contentBlock = new ContentBlock(claim, repoSize); claimManager.incrementClaimantCount(resourceClaim, true); http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index 27d6c9b..ca52544 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -779,7 +779,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis lossTolerant = false; } - final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant); + final ResourceClaim resourceClaim = claimManager.newResourceClaim(container, section, claimId, lossTolerant, false); final StandardContentClaim contentClaim = new StandardContentClaim(resourceClaim, resourceOffset); contentClaim.setLength(resourceLength); http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java index 9cb0fa1..be0e8b8 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java @@ -29,14 +29,25 @@ import org.slf4j.LoggerFactory; public class StandardResourceClaimManager implements ResourceClaimManager { - private static final ConcurrentMap<ResourceClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); + private static final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = new ConcurrentHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(StandardResourceClaimManager.class); private static final BlockingQueue<ResourceClaim> destructableClaims = new LinkedBlockingQueue<>(50000); @Override - public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant) { - return new StandardResourceClaim(this, container, section, id, lossTolerant); + public ResourceClaim newResourceClaim(final String container, final String section, final String id, final boolean lossTolerant, final boolean writable) { + final StandardResourceClaim claim = new StandardResourceClaim(this, container, section, id, lossTolerant); + if (!writable) { + claim.freeze(); + } + return claim; + } + + @Override + public ResourceClaim getResourceClaim(final String container, final String section, final String id) { + final ResourceClaim tempClaim = new StandardResourceClaim(this, container, section, id, false); + final ClaimCount count = claimantCounts.get(tempClaim); + return (count == null) ? null : count.getClaim(); } private static AtomicInteger getCounter(final ResourceClaim claim) { @@ -44,14 +55,14 @@ public class StandardResourceClaimManager implements ResourceClaimManager { return null; } - AtomicInteger counter = claimantCounts.get(claim); + ClaimCount counter = claimantCounts.get(claim); if (counter != null) { - return counter; + return counter.getCount(); } - counter = new AtomicInteger(0); - final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter); - return existingCounter == null ? counter : existingCounter; + counter = new ClaimCount(claim, new AtomicInteger(0)); + final ClaimCount existingCounter = claimantCounts.putIfAbsent(claim, counter); + return existingCounter == null ? counter.getCount() : existingCounter.getCount(); } @Override @@ -61,8 +72,8 @@ public class StandardResourceClaimManager implements ResourceClaimManager { } synchronized (claim) { - final AtomicInteger counter = claimantCounts.get(claim); - return counter == null ? 0 : counter.get(); + final ClaimCount counter = claimantCounts.get(claim); + return counter == null ? 0 : counter.getCount().get(); } } @@ -73,13 +84,13 @@ public class StandardResourceClaimManager implements ResourceClaimManager { } synchronized (claim) { - final AtomicInteger counter = claimantCounts.get(claim); + final ClaimCount counter = claimantCounts.get(claim); if (counter == null) { logger.warn("Decrementing claimant count for {} but claimant count is not known. Returning -1", claim); return -1; } - final int newClaimantCount = counter.decrementAndGet(); + final int newClaimantCount = counter.getCount().decrementAndGet(); if (newClaimantCount < 0) { logger.error("Decremented claimant count for {} to {}", claim, newClaimantCount); } else { @@ -178,4 +189,23 @@ public class StandardResourceClaimManager implements ResourceClaimManager { ((StandardResourceClaim) claim).freeze(); } + + + private static final class ClaimCount { + private final ResourceClaim claim; + private final AtomicInteger count; + + public ClaimCount(final ResourceClaim claim, final AtomicInteger count) { + this.claim = claim; + this.count = count; + } + + public AtomicInteger getCount() { + return count; + } + + public ResourceClaim getClaim() { + return claim; + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index 2ab8e35..24b61b9 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -120,7 +120,12 @@ public class TestFileSystemSwapManager { public class NopResourceClaimManager implements ResourceClaimManager { @Override - public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant) { + public ResourceClaim newResourceClaim(String container, String section, String id, boolean lossTolerant, boolean writable) { + return null; + } + + @Override + public ResourceClaim getResourceClaim(String container, String section, String id) { return null; } http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 2a401c6..71bfd27 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -829,7 +829,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .size(1L) .build(); flowFileQueue.put(flowFileRecord); @@ -977,7 +977,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .size(1L) .build(); flowFileQueue.put(flowFileRecord); @@ -1001,7 +1001,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .build(); flowFileQueue.put(flowFileRecord); @@ -1017,7 +1017,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaimOffset(1000L) .size(1000L) .build(); @@ -1042,7 +1042,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .build(); flowFileQueue.put(flowFileRecord); @@ -1059,7 +1059,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord2 = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaimOffset(1000L).size(1L).build(); flowFileQueue.put(flowFileRecord2); @@ -1128,7 +1128,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaimOffset(0L).size(0L).build(); flowFileQueue.put(flowFileRecord); @@ -1166,7 +1166,7 @@ public class TestStandardProcessSession { final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") .entryDate(System.currentTimeMillis()) - .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true), 0L)) + .contentClaim(new StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", true, false), 0L)) .contentClaimOffset(0L).size(0L).build(); flowFileQueue.put(flowFileRecord); @@ -1395,7 +1395,7 @@ public class TestStandardProcessSession { final Set<ContentClaim> claims = new HashSet<>(); for (long i = 0; i < idGenerator.get(); i++) { - final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false); + final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim("container", "section", String.valueOf(i), false, false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); if (getClaimantCount(contentClaim) > 0) { claims.add(contentClaim); @@ -1407,7 +1407,7 @@ public class TestStandardProcessSession { @Override public ContentClaim create(boolean lossTolerant) throws IOException { - final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", String.valueOf(idGenerator.getAndIncrement()), false, false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); claimantCounts.put(contentClaim, new AtomicInteger(1)); http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java index 5733164..0b24c03 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java @@ -83,7 +83,7 @@ public class TestVolatileContentRepository { final ContentRepository mockRepo = Mockito.mock(ContentRepository.class); contentRepo.setBackupRepository(mockRepo); - final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true); + final ResourceClaim resourceClaim = claimManager.newResourceClaim("container", "section", "1000", true, false); final ContentClaim contentClaim = new StandardContentClaim(resourceClaim, 0L); Mockito.when(mockRepo.create(Matchers.anyBoolean())).thenReturn(contentClaim); http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java index ee26d1f..ca79fab 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java @@ -86,10 +86,10 @@ public class TestWriteAheadFlowFileRepository { when(connection.getFlowFileQueue()).thenReturn(queue); queueProvider.addConnection(connection); - final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false); + final ResourceClaim resourceClaim1 = claimManager.newResourceClaim("container", "section", "1", false, false); final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 0L); - final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false); + final ResourceClaim resourceClaim2 = claimManager.newResourceClaim("container", "section", "2", false, false); final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 0L); // Create a flowfile repo, update it once with a FlowFile that points to one resource claim. Then, http://git-wip-us.apache.org/repos/asf/nifi/blob/a9395bc6/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java index d29105a..867810e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java @@ -52,7 +52,7 @@ public class TestStandardResourceClaimManager { } }; - final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false); + final ResourceClaim resourceClaim = manager.newResourceClaim("container", "section", "id", false, false); assertEquals(1, manager.incrementClaimantCount(resourceClaim)); // increment claimant count to 1. assertEquals(1, manager.getClaimantCount(resourceClaim));