Repository: incubator-nifi Updated Branches: refs/heads/NIFI-731 [created] bb5128be2
NIFI-731: Merged develop to branch Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fb0a1a76 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fb0a1a76 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fb0a1a76 Branch: refs/heads/NIFI-731 Commit: fb0a1a76f431148fa9664e565c94cf334498dbf8 Parents: e35f348 Author: Mark Payne <marka...@hotmail.com> Authored: Sun Jun 28 13:36:34 2015 -0400 Committer: Mark Payne <marka...@hotmail.com> Committed: Sun Jun 28 13:36:34 2015 -0400 ---------------------------------------------------------------------- .../repository/claim/ContentClaimManager.java | 15 +- .../repository/FileSystemRepository.java | 230 ++++++++++--------- .../repository/VolatileContentRepository.java | 2 +- .../WriteAheadFlowFileRepository.java | 22 +- .../claim/StandardContentClaimManager.java | 36 ++- .../controller/TestFileSystemSwapManager.java | 4 +- 6 files changed, 172 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java index bffcec3..bf6acc0 100644 --- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java +++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java @@ -110,29 +110,30 @@ public interface ContentClaimManager { /** * Drains up to {@code maxElements} Content Claims from the internal queue - * of destructable content claims to the given {@code destination} so that - * they can be destroyed. + * of destructable content claims that belong to the given Content Repository container + * to the given {@code destination} so that they can be destroyed. * + * @param container the container to drain * @param destination to drain to * @param maxElements max items to drain */ - void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements); + void drainDestructableClaims(String container, Collection<ContentClaim> destination, int maxElements); /** - * Drains up to {@code maxElements} Content Claims from the internal queue - * of destructable content claims to the given {@code destination} so that - * they can be destroyed. If no ContentClaim is ready to be destroyed at + * Drains up to {@code maxElements} Content Claims that belong to the given Content Repository + * container from the internal queue of destructable content claims to the given {@code destination} so that they can be destroyed. If no ContentClaim is ready to be destroyed at * this time, will wait up to the specified amount of time before returning. * If, after the specified amount of time, there is still no ContentClaim * ready to be destroyed, the method will return without having added * anything to the given {@code destination}. * + * @param container the container to drain * @param destination to drain to * @param maxElements max items to drain * @param timeout maximum time to wait * @param unit unit of time to wait */ - void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit); + void drainDestructableClaims(String container, Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit); /** * Clears the manager's memory of any and all ContentClaims that it knows http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 1171636..4e4609a 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -46,8 +46,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -61,18 +59,17 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.controller.repository.claim.ContentClaim; import org.apache.nifi.controller.repository.claim.ContentClaimManager; import org.apache.nifi.controller.repository.io.SyncOnCloseOutputStream; import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.util.file.FileUtils; import org.apache.nifi.stream.io.StreamUtils; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.LongHolder; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.StopWatch; - -import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.util.file.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,16 +88,15 @@ public class FileSystemRepository implements ContentRepository { private final List<String> containerNames; private final AtomicLong index; - private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true); - private final ConcurrentMap<String, BlockingQueue<ContentClaim>> reclaimable = new ConcurrentHashMap<>(); private final Map<String, ContainerState> containerStateMap = new HashMap<>(); private final boolean archiveData; private final long maxArchiveMillis; private final Map<String, Long> minUsableContainerBytesForArchive = new HashMap<>(); private final boolean alwaysSync; - private final ScheduledExecutorService containerCleanupExecutor; + private ScheduledExecutorService executor; // effectively final + private ScheduledExecutorService containerCleanupExecutor; // effectively final private ContentClaimManager contentClaimManager; // effectively final // Map of contianer to archived files that should be deleted next. @@ -112,17 +108,16 @@ public class FileSystemRepository implements ContentRepository { public FileSystemRepository() throws IOException { final NiFiProperties properties = NiFiProperties.getInstance(); // determine the file repository paths and ensure they exist - final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths(); - for (Path path : fileRespositoryPaths.values()) { + final Map<String, Path> fileRepositoryPaths = properties.getContentRepositoryPaths(); + for (final Path path : fileRepositoryPaths.values()) { Files.createDirectories(path); } - this.containers = new HashMap<>(fileRespositoryPaths); + this.containers = new HashMap<>(fileRepositoryPaths); this.containerNames = new ArrayList<>(containers.keySet()); index = new AtomicLong(0L); for (final String containerName : containerNames) { - reclaimable.put(containerName, new LinkedBlockingQueue<ContentClaim>(10000)); archivedFiles.put(containerName, new LinkedBlockingQueue<ArchiveInfo>(100000)); } @@ -191,8 +186,6 @@ public class FileSystemRepository implements ContentRepository { this.alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.content.repository.always.sync")); LOG.info("Initializing FileSystemRepository with 'Always Sync' set to {}", alwaysSync); initializeRepository(); - - containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container", true); } @Override @@ -201,11 +194,11 @@ public class FileSystemRepository implements ContentRepository { final NiFiProperties properties = NiFiProperties.getInstance(); - final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths(); + final Map<String, Path> fileRepositoryPaths = properties.getContentRepositoryPaths(); - executor.scheduleWithFixedDelay(new BinDestructableClaims(), 1, 1, TimeUnit.SECONDS); - for (int i = 0; i < fileRespositoryPaths.size(); i++) { - executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS); + executor = new FlowEngine(fileRepositoryPaths.size(), "FileSystemRepository Workers", true); + for (final String containerName : fileRepositoryPaths.keySet()) { + executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(containerName), 1, 1, TimeUnit.SECONDS); } final String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY); @@ -219,18 +212,31 @@ public class FileSystemRepository implements ContentRepository { throw new RuntimeException("Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY); } } - for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) { - final String containerName = containerEntry.getKey(); - final Path containerPath = containerEntry.getValue(); - final Runnable cleanup = new DestroyExpiredArchiveClaims(containerName, containerPath); - containerCleanupExecutor.scheduleWithFixedDelay(cleanup, cleanupMillis, cleanupMillis, TimeUnit.MILLISECONDS); + + // if we are not archiving, and nothing is currently archived, we don't need to start threads to cleanup the + // archive in the repository. Otherwise, we need to continually delete "expired" archived content claims. + if (oldestArchiveDate.get() > 0L || archiveData) { + containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container", true); + for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) { + final String containerName = containerEntry.getKey(); + final Path containerPath = containerEntry.getValue(); + final Runnable cleanup = new DestroyExpiredArchiveClaims(containerName, containerPath); + containerCleanupExecutor.scheduleWithFixedDelay(cleanup, cleanupMillis, cleanupMillis, TimeUnit.MILLISECONDS); + } + } else { + containerCleanupExecutor = null; } } @Override public void shutdown() { - executor.shutdown(); - containerCleanupExecutor.shutdown(); + if (executor != null) { + executor.shutdown(); + } + + if (containerCleanupExecutor != null) { + containerCleanupExecutor.shutdown(); + } } private static double getRatio(final String value) { @@ -435,6 +441,10 @@ public class FileSystemRepository implements ContentRepository { } private Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException { + return getPath(claim, verifyExists, true); + } + + private Path getPath(final ContentClaim claim, final boolean verifyExists, final boolean checkArchive) throws ContentNotFoundException { final Path containerPath = containers.get(claim.getContainer()); if (containerPath == null) { if (verifyExists) { @@ -448,7 +458,7 @@ public class FileSystemRepository implements ContentRepository { Path resolvedPath = containerPath.resolve(claim.getSection()).resolve(String.valueOf(claim.getId())); // If the data does not exist, create a Path that points to where the data would exist in the archive directory. - if (!Files.exists(resolvedPath)) { + if (checkArchive && !Files.exists(resolvedPath)) { resolvedPath = getArchivePath(claim); } @@ -519,6 +529,10 @@ public class FileSystemRepository implements ContentRepository { @Override public boolean remove(final ContentClaim claim) { + return remove(claim, true); + } + + private boolean remove(final ContentClaim claim, final boolean checkArchive) { if (claim == null) { return false; } @@ -553,7 +567,7 @@ public class FileSystemRepository implements ContentRepository { fos.getFD().sync(); } } catch (final IOException ioe) { - remove(newClaim); + remove(newClaim, false); throw ioe; } return newClaim; @@ -583,7 +597,7 @@ public class FileSystemRepository implements ContentRepository { } } // don't add demarcator after the last claim - if (demarcator != null && demarcator.length > 0 && (++objectIndex < claims.size())) { + if (demarcator != null && demarcator.length > 0 && ++objectIndex < claims.size()) { final ByteBuffer buffer = ByteBuffer.wrap(demarcator); while (buffer.hasRemaining()) { position += dest.write(buffer, position); @@ -791,46 +805,46 @@ public class FileSystemRepository implements ContentRepository { contentClaimManager.purge(); } - private class BinDestructableClaims implements Runnable { - - @Override - public void run() { - try { - // Get all of the Destructable Claims and bin them based on their Container. We do this - // because the Container generally maps to a physical partition on the disk, so we want a few - // different threads hitting the different partitions but don't want multiple threads hitting - // the same partition. - final List<ContentClaim> toDestroy = new ArrayList<>(); - while (true) { - toDestroy.clear(); - contentClaimManager.drainDestructableClaims(toDestroy, 10000); - if (toDestroy.isEmpty()) { - return; - } - - for (final ContentClaim claim : toDestroy) { - final String container = claim.getContainer(); - final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container); - - try { - while (true) { - if (claimQueue.offer(claim, 10, TimeUnit.MINUTES)) { - break; - } else { - LOG.warn("Failed to clean up {} because old claims aren't being cleaned up fast enough. " - + "This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim); - } - } - } catch (final InterruptedException ie) { - LOG.warn("Failed to clean up {} because thread was interrupted", claim); - } - } - } - } catch (final Throwable t) { - LOG.error("Failed to cleanup content claims due to {}", t); - } - } - } + // private class BinDestructableClaims implements Runnable { + // + // @Override + // public void run() { + // try { + // // Get all of the Destructable Claims and bin them based on their Container. We do this + // // because the Container generally maps to a physical partition on the disk, so we want a few + // // different threads hitting the different partitions but don't want multiple threads hitting + // // the same partition. + // final List<ContentClaim> toDestroy = new ArrayList<>(); + // while (true) { + // toDestroy.clear(); + // contentClaimManager.drainDestructableClaims(toDestroy, 10000); + // if (toDestroy.isEmpty()) { + // return; + // } + // + // for (final ContentClaim claim : toDestroy) { + // final String container = claim.getContainer(); + // final BlockingQueue<ContentClaim> claimQueue = reclaimable.get(container); + // + // try { + // while (true) { + // if (claimQueue.offer(claim, 10, TimeUnit.MINUTES)) { + // break; + // } else { + // LOG.warn("Failed to clean up {} because old claims aren't being cleaned up fast enough. " + // + "This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim); + // } + // } + // } catch (final InterruptedException ie) { + // LOG.warn("Failed to clean up {} because thread was interrupted", claim); + // } + // } + // } + // } catch (final Throwable t) { + // LOG.error("Failed to cleanup content claims due to {}", t); + // } + // } + // } public static Path getArchivePath(final Path contentClaimPath) { final Path sectionPath = contentClaimPath.getParent(); @@ -1098,7 +1112,13 @@ public class FileSystemRepository implements ContentRepository { return oldestContainerArchive; } + private class ArchiveOrDestroyDestructableClaims implements Runnable { + private final String containerName; + + public ArchiveOrDestroyDestructableClaims(final String containerName) { + this.containerName = containerName; + } @Override public void run() { @@ -1107,47 +1127,43 @@ public class FileSystemRepository implements ContentRepository { while (true) { // look through each of the binned queues of Content Claims int successCount = 0; - final List<ContentClaim> toRemove = new ArrayList<>(); - for (final Map.Entry<String, BlockingQueue<ContentClaim>> entry : reclaimable.entrySet()) { - // drain the queue of all ContentClaims that can be destroyed for the given container. - final String container = entry.getKey(); - final ContainerState containerState = containerStateMap.get(container); - - toRemove.clear(); - entry.getValue().drainTo(toRemove); - if (toRemove.isEmpty()) { - continue; - } + final List<ContentClaim> toRemove = new ArrayList<>(10); + final ContainerState containerState = containerStateMap.get(containerName); - // destroy each claim for this container - final long start = System.nanoTime(); - for (final ContentClaim claim : toRemove) { - if (archiveData) { - try { - archive(claim); - containerState.incrementArchiveCount(); - successCount++; - } catch (final Exception e) { - LOG.warn("Failed to archive {} due to {}", claim, e.toString()); - if (LOG.isDebugEnabled()) { - LOG.warn("", e); - } - } - } else { - if (remove(claim)) { - successCount++; + toRemove.clear(); + contentClaimManager.drainDestructableClaims(containerName, toRemove, 10); + if (toRemove.isEmpty()) { + return; + } + + // destroy each claim for this container + final long start = System.nanoTime(); + for (final ContentClaim claim : toRemove) { + if (archiveData) { + try { + archive(claim); + containerState.incrementArchiveCount(); + successCount++; + } catch (final Exception e) { + LOG.warn("Failed to archive {} due to {}", claim, e.toString()); + if (LOG.isDebugEnabled()) { + LOG.warn("", e); } } + } else { + if (remove(claim, false)) { + successCount++; + } } + } - final long nanos = System.nanoTime() - start; - final long millis = TimeUnit.NANOSECONDS.toMillis(nanos); + final long nanos = System.nanoTime() - start; + final long millis = TimeUnit.NANOSECONDS.toMillis(nanos); - if (successCount == 0) { - LOG.debug("No ContentClaims archived/removed for Container {}", container); - } else { - LOG.info("Successfully {} {} Content Claims for Container {} in {} millis", archiveData ? "archived" : "destroyed", successCount, container, millis); - } + if (successCount == 0) { + LOG.debug("No ContentClaims archived/removed for Container {}", containerName); + } else { + LOG.info("Successfully {} {} Content Claims for Container {} in {} millis", archiveData ? "archived" : "destroyed", successCount, containerName, millis); } // if we didn't destroy anything, we're done. @@ -1210,7 +1226,7 @@ public class FileSystemRepository implements ContentRepository { @Override public void run() { try { - if (oldestArchiveDate.get() > (System.currentTimeMillis() - maxArchiveMillis)) { + if (oldestArchiveDate.get() > System.currentTimeMillis() - maxArchiveMillis) { final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName); if (minRequiredSpace == null) { return; @@ -1245,7 +1261,7 @@ public class FileSystemRepository implements ContentRepository { if (oldestContainerArchive < 0L) { boolean updated; do { - long oldest = oldestArchiveDate.get(); + final long oldest = oldestArchiveDate.get(); if (oldestContainerArchive < oldest) { updated = oldestArchiveDate.compareAndSet(oldest, oldestContainerArchive); @@ -1298,7 +1314,7 @@ public class FileSystemRepository implements ContentRepository { final long free = getContainerUsableSpace(containerName); used = capacity - free; bytesUsed = used; - } catch (IOException e) { + } catch (final IOException e) { return false; } } @@ -1317,7 +1333,7 @@ public class FileSystemRepository implements ContentRepository { try { LOG.info("Unable to write to container {} due to archive file size constraints; waiting for archive cleanup", containerName); condition.await(); - } catch (InterruptedException e) { + } catch (final InterruptedException e) { } } } finally { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java index 3bfdd8a..f9f5b68 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java @@ -627,7 +627,7 @@ public class VolatileContentRepository implements ContentRepository { final List<ContentClaim> destructable = new ArrayList<>(1000); while (true) { destructable.clear(); - claimManager.drainDestructableClaims(destructable, 1000, 5, TimeUnit.SECONDS); + claimManager.drainDestructableClaims(CONTAINER_NAME, destructable, 1000, 5, TimeUnit.SECONDS); if (destructable.isEmpty()) { return; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java index f2df821..cb14fea 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java @@ -163,9 +163,11 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis return Files.getFileStore(flowFileRepositoryPath).getUsableSpace(); } + private final AtomicLong updateCount = new AtomicLong(0L); @Override public void updateRepository(final Collection<RepositoryRecord> records) throws IOException { - updateRepository(records, alwaysSync); + final long updates = updateCount.incrementAndGet(); + updateRepository(records, alwaysSync || updates % 100 == 0); } private void updateRepository(final Collection<RepositoryRecord> records, final boolean sync) throws IOException { @@ -212,7 +214,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis BlockingQueue<ContentClaim> claimQueue = claimsAwaitingDestruction.get(partitionKey); if (claimQueue == null) { claimQueue = new LinkedBlockingQueue<>(); - BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); + final BlockingQueue<ContentClaim> existingClaimQueue = claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue); if (existingClaimQueue != null) { claimQueue = existingClaimQueue; } @@ -339,7 +341,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis final long start = System.nanoTime(); final int numRecordsCheckpointed = checkpoint(); final long end = System.nanoTime(); - final long millis = TimeUnit.MILLISECONDS.convert((end - start), TimeUnit.NANOSECONDS); + final long millis = TimeUnit.MILLISECONDS.convert(end - start, TimeUnit.NANOSECONDS); logger.info("Successfully checkpointed FlowFile Repository with {} records in {} milliseconds", new Object[]{numRecordsCheckpointed, millis}); } catch (final IOException e) { @@ -518,7 +520,7 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis } final StandardFlowFileRecord.Builder ffBuilder = new StandardFlowFileRecord.Builder(); - RepositoryRecord record = currentRecordStates.get(recordId); + final RepositoryRecord record = currentRecordStates.get(recordId); ffBuilder.id(recordId); if (record != null) { ffBuilder.fromFlowFile(record.getCurrent()); @@ -785,16 +787,16 @@ public class WriteAheadFlowFileRepository implements FlowFileRepository, SyncLis throw new EOFException(); } if (firstValue == 0xff && secondValue == 0xff) { - int ch1 = in.read(); - int ch2 = in.read(); - int ch3 = in.read(); - int ch4 = in.read(); + final int ch1 = in.read(); + final int ch2 = in.read(); + final int ch3 = in.read(); + final int ch4 = in.read(); if ((ch1 | ch2 | ch3 | ch4) < 0) { throw new EOFException(); } - return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4)); + return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4; } else { - return ((firstValue << 8) + (secondValue)); + return (firstValue << 8) + secondValue; } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java index b68f95e..95f4c7f 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java @@ -32,7 +32,7 @@ public class StandardContentClaimManager implements ContentClaimManager { private static final ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = new ConcurrentHashMap<>(); private static final Logger logger = LoggerFactory.getLogger(StandardContentClaimManager.class); - private static final BlockingQueue<ContentClaim> destructableClaims = new LinkedBlockingQueue<>(50000); + private static final ConcurrentMap<String, BlockingQueue<ContentClaim>> destructableClaims = new ConcurrentHashMap<>(); @Override public ContentClaim newContentClaim(final String container, final String section, final String id, final boolean lossTolerant) { @@ -50,8 +50,8 @@ public class StandardContentClaimManager implements ContentClaimManager { } counter = new AtomicInteger(0); - AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter); - return (existingCounter == null) ? counter : existingCounter; + final AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, counter); + return existingCounter == null ? counter : existingCounter; } @Override @@ -60,7 +60,7 @@ public class StandardContentClaimManager implements ContentClaimManager { return 0; } final AtomicInteger counter = claimantCounts.get(claim); - return (counter == null) ? 0 : counter.get(); + return counter == null ? 0 : counter.get(); } @Override @@ -113,25 +113,41 @@ public class StandardContentClaimManager implements ContentClaimManager { logger.debug("Marking claim {} as destructable", claim); try { - while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) { + final BlockingQueue<ContentClaim> destructableQueue = getDestructableClaimQueue(claim.getContainer()); + while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) { } } catch (final InterruptedException ie) { } } + private BlockingQueue<ContentClaim> getDestructableClaimQueue(final String container) { + BlockingQueue<ContentClaim> claimQueue = destructableClaims.get(container); + if (claimQueue == null) { + claimQueue = new LinkedBlockingQueue<>(10000); + final BlockingQueue<ContentClaim> existing = destructableClaims.putIfAbsent(container, claimQueue); + if (existing != null) { + claimQueue = existing; + } + } + + return claimQueue; + } + @Override - public void drainDestructableClaims(final Collection<ContentClaim> destination, final int maxElements) { - final int drainedCount = destructableClaims.drainTo(destination, maxElements); + public void drainDestructableClaims(final String container, final Collection<ContentClaim> destination, final int maxElements) { + final BlockingQueue<ContentClaim> destructableQueue = getDestructableClaimQueue(container); + final int drainedCount = destructableQueue.drainTo(destination, maxElements); logger.debug("Drained {} destructable claims to {}", drainedCount, destination); } @Override - public void drainDestructableClaims(final Collection<ContentClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) { + public void drainDestructableClaims(final String container, final Collection<ContentClaim> destination, final int maxElements, final long timeout, final TimeUnit unit) { try { - final ContentClaim firstClaim = destructableClaims.poll(timeout, unit); + final BlockingQueue<ContentClaim> destructableQueue = getDestructableClaimQueue(container); + final ContentClaim firstClaim = destructableQueue.poll(timeout, unit); if (firstClaim != null) { destination.add(firstClaim); - destructableClaims.drainTo(destination, maxElements - 1); + destructableQueue.drainTo(destination, maxElements - 1); } } catch (final InterruptedException e) { } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java index 364dcad..d5b1d52 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java @@ -90,11 +90,11 @@ public class TestFileSystemSwapManager { } @Override - public void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements) { + public void drainDestructableClaims(String container, Collection<ContentClaim> destination, int maxElements) { } @Override - public void drainDestructableClaims(Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit) { + public void drainDestructableClaims(String container, Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit unit) { } @Override