Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-731 [created] bb5128be2


NIFI-731: Merged develop to branch


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fb0a1a76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fb0a1a76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fb0a1a76

Branch: refs/heads/NIFI-731
Commit: fb0a1a76f431148fa9664e565c94cf334498dbf8
Parents: e35f348
Author: Mark Payne <marka...@hotmail.com>
Authored: Sun Jun 28 13:36:34 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Sun Jun 28 13:36:34 2015 -0400

----------------------------------------------------------------------
 .../repository/claim/ContentClaimManager.java   |  15 +-
 .../repository/FileSystemRepository.java        | 230 ++++++++++---------
 .../repository/VolatileContentRepository.java   |   2 +-
 .../WriteAheadFlowFileRepository.java           |  22 +-
 .../claim/StandardContentClaimManager.java      |  36 ++-
 .../controller/TestFileSystemSwapManager.java   |   4 +-
 6 files changed, 172 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
 
b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
index bffcec3..bf6acc0 100644
--- 
a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
+++ 
b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimManager.java
@@ -110,29 +110,30 @@ public interface ContentClaimManager {
 
     /**
      * Drains up to {@code maxElements} Content Claims from the internal queue
-     * of destructable content claims to the given {@code destination} so that
-     * they can be destroyed.
+     * of destructable content claims that belong to the given Content 
Repository container
+     * to the given {@code destination} so that they can be destroyed.
      *
+     * @param container the container to drain
      * @param destination to drain to
      * @param maxElements max items to drain
      */
-    void drainDestructableClaims(Collection<ContentClaim> destination, int 
maxElements);
+    void drainDestructableClaims(String container, Collection<ContentClaim> 
destination, int maxElements);
 
     /**
-     * Drains up to {@code maxElements} Content Claims from the internal queue
-     * of destructable content claims to the given {@code destination} so that
-     * they can be destroyed. If no ContentClaim is ready to be destroyed at
+     * Drains up to {@code maxElements} Content Claims that belong to the 
given Content Repository
+     * container from the internal queue of destructable content claims to the 
given {@code destination} so that they can be destroyed. If no ContentClaim is 
ready to be destroyed at
      * this time, will wait up to the specified amount of time before 
returning.
      * If, after the specified amount of time, there is still no ContentClaim
      * ready to be destroyed, the method will return without having added
      * anything to the given {@code destination}.
      *
+     * @param container the container to drain
      * @param destination to drain to
      * @param maxElements max items to drain
      * @param timeout maximum time to wait
      * @param unit unit of time to wait
      */
-    void drainDestructableClaims(Collection<ContentClaim> destination, int 
maxElements, long timeout, TimeUnit unit);
+    void drainDestructableClaims(String container, Collection<ContentClaim> 
destination, int maxElements, long timeout, TimeUnit unit);
 
     /**
      * Clears the manager's memory of any and all ContentClaims that it knows

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 1171636..4e4609a 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -46,8 +46,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -61,18 +59,17 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.regex.Pattern;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentClaimManager;
 import org.apache.nifi.controller.repository.io.SyncOnCloseOutputStream;
 import org.apache.nifi.engine.FlowEngine;
-import org.apache.nifi.util.file.FileUtils;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.LongHolder;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.StopWatch;
-
-import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.util.file.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,16 +88,15 @@ public class FileSystemRepository implements 
ContentRepository {
     private final List<String> containerNames;
     private final AtomicLong index;
 
-    private final ScheduledExecutorService executor = new FlowEngine(4, 
"FileSystemRepository Workers", true);
-    private final ConcurrentMap<String, BlockingQueue<ContentClaim>> 
reclaimable = new ConcurrentHashMap<>();
     private final Map<String, ContainerState> containerStateMap = new 
HashMap<>();
 
     private final boolean archiveData;
     private final long maxArchiveMillis;
     private final Map<String, Long> minUsableContainerBytesForArchive = new 
HashMap<>();
     private final boolean alwaysSync;
-    private final ScheduledExecutorService containerCleanupExecutor;
 
+    private ScheduledExecutorService executor; // effectively final
+    private ScheduledExecutorService containerCleanupExecutor; // effectively 
final
     private ContentClaimManager contentClaimManager; // effectively final
 
     // Map of contianer to archived files that should be deleted next.
@@ -112,17 +108,16 @@ public class FileSystemRepository implements 
ContentRepository {
     public FileSystemRepository() throws IOException {
         final NiFiProperties properties = NiFiProperties.getInstance();
         // determine the file repository paths and ensure they exist
-        final Map<String, Path> fileRespositoryPaths = 
properties.getContentRepositoryPaths();
-        for (Path path : fileRespositoryPaths.values()) {
+        final Map<String, Path> fileRepositoryPaths = 
properties.getContentRepositoryPaths();
+        for (final Path path : fileRepositoryPaths.values()) {
             Files.createDirectories(path);
         }
 
-        this.containers = new HashMap<>(fileRespositoryPaths);
+        this.containers = new HashMap<>(fileRepositoryPaths);
         this.containerNames = new ArrayList<>(containers.keySet());
         index = new AtomicLong(0L);
 
         for (final String containerName : containerNames) {
-            reclaimable.put(containerName, new 
LinkedBlockingQueue<ContentClaim>(10000));
             archivedFiles.put(containerName, new 
LinkedBlockingQueue<ArchiveInfo>(100000));
         }
 
@@ -191,8 +186,6 @@ public class FileSystemRepository implements 
ContentRepository {
         this.alwaysSync = 
Boolean.parseBoolean(properties.getProperty("nifi.content.repository.always.sync"));
         LOG.info("Initializing FileSystemRepository with 'Always Sync' set to 
{}", alwaysSync);
         initializeRepository();
-
-        containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup 
FileSystemRepository Container", true);
     }
 
     @Override
@@ -201,11 +194,11 @@ public class FileSystemRepository implements 
ContentRepository {
 
         final NiFiProperties properties = NiFiProperties.getInstance();
 
-        final Map<String, Path> fileRespositoryPaths = 
properties.getContentRepositoryPaths();
+        final Map<String, Path> fileRepositoryPaths = 
properties.getContentRepositoryPaths();
 
-        executor.scheduleWithFixedDelay(new BinDestructableClaims(), 1, 1, 
TimeUnit.SECONDS);
-        for (int i = 0; i < fileRespositoryPaths.size(); i++) {
-            executor.scheduleWithFixedDelay(new 
ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS);
+        executor = new FlowEngine(fileRepositoryPaths.size(), 
"FileSystemRepository Workers", true);
+        for (final String containerName : fileRepositoryPaths.keySet()) {
+            executor.scheduleWithFixedDelay(new 
ArchiveOrDestroyDestructableClaims(containerName), 1, 1, TimeUnit.SECONDS);
         }
 
         final String archiveCleanupFrequency = 
properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
@@ -219,18 +212,31 @@ public class FileSystemRepository implements 
ContentRepository {
                 throw new RuntimeException("Invalid value set for property " + 
NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
             }
         }
-        for (final Map.Entry<String, Path> containerEntry : 
containers.entrySet()) {
-            final String containerName = containerEntry.getKey();
-            final Path containerPath = containerEntry.getValue();
-            final Runnable cleanup = new 
DestroyExpiredArchiveClaims(containerName, containerPath);
-            containerCleanupExecutor.scheduleWithFixedDelay(cleanup, 
cleanupMillis, cleanupMillis, TimeUnit.MILLISECONDS);
+
+        // if we are not archiving, and nothing is currently archived, we 
don't need to start threads to cleanup the
+        // archive in the repository. Otherwise, we need to continually delete 
"expired" archived content claims.
+        if (oldestArchiveDate.get() > 0L || archiveData) {
+            containerCleanupExecutor = new FlowEngine(containers.size(), 
"Cleanup FileSystemRepository Container", true);
+            for (final Map.Entry<String, Path> containerEntry : 
containers.entrySet()) {
+                final String containerName = containerEntry.getKey();
+                final Path containerPath = containerEntry.getValue();
+                final Runnable cleanup = new 
DestroyExpiredArchiveClaims(containerName, containerPath);
+                containerCleanupExecutor.scheduleWithFixedDelay(cleanup, 
cleanupMillis, cleanupMillis, TimeUnit.MILLISECONDS);
+            }
+        } else {
+            containerCleanupExecutor = null;
         }
     }
 
     @Override
     public void shutdown() {
-        executor.shutdown();
-        containerCleanupExecutor.shutdown();
+        if (executor != null) {
+            executor.shutdown();
+        }
+
+        if (containerCleanupExecutor != null) {
+            containerCleanupExecutor.shutdown();
+        }
     }
 
     private static double getRatio(final String value) {
@@ -435,6 +441,10 @@ public class FileSystemRepository implements 
ContentRepository {
     }
 
     private Path getPath(final ContentClaim claim, final boolean verifyExists) 
throws ContentNotFoundException {
+        return getPath(claim, verifyExists, true);
+    }
+
+    private Path getPath(final ContentClaim claim, final boolean verifyExists, 
final boolean checkArchive) throws ContentNotFoundException {
         final Path containerPath = containers.get(claim.getContainer());
         if (containerPath == null) {
             if (verifyExists) {
@@ -448,7 +458,7 @@ public class FileSystemRepository implements 
ContentRepository {
         Path resolvedPath = 
containerPath.resolve(claim.getSection()).resolve(String.valueOf(claim.getId()));
 
         // If the data does not exist, create a Path that points to where the 
data would exist in the archive directory.
-        if (!Files.exists(resolvedPath)) {
+        if (checkArchive && !Files.exists(resolvedPath)) {
             resolvedPath = getArchivePath(claim);
         }
 
@@ -519,6 +529,10 @@ public class FileSystemRepository implements 
ContentRepository {
 
     @Override
     public boolean remove(final ContentClaim claim) {
+        return remove(claim, true);
+    }
+
+    private boolean remove(final ContentClaim claim, final boolean 
checkArchive) {
         if (claim == null) {
             return false;
         }
@@ -553,7 +567,7 @@ public class FileSystemRepository implements 
ContentRepository {
                 fos.getFD().sync();
             }
         } catch (final IOException ioe) {
-            remove(newClaim);
+            remove(newClaim, false);
             throw ioe;
         }
         return newClaim;
@@ -583,7 +597,7 @@ public class FileSystemRepository implements 
ContentRepository {
                     }
                 }
                 // don't add demarcator after the last claim
-                if (demarcator != null && demarcator.length > 0 && 
(++objectIndex < claims.size())) {
+                if (demarcator != null && demarcator.length > 0 && 
++objectIndex < claims.size()) {
                     final ByteBuffer buffer = ByteBuffer.wrap(demarcator);
                     while (buffer.hasRemaining()) {
                         position += dest.write(buffer, position);
@@ -791,46 +805,46 @@ public class FileSystemRepository implements 
ContentRepository {
         contentClaimManager.purge();
     }
 
-    private class BinDestructableClaims implements Runnable {
-
-        @Override
-        public void run() {
-            try {
-                // Get all of the Destructable Claims and bin them based on 
their Container. We do this
-                // because the Container generally maps to a physical 
partition on the disk, so we want a few
-                // different threads hitting the different partitions but 
don't want multiple threads hitting
-                // the same partition.
-                final List<ContentClaim> toDestroy = new ArrayList<>();
-                while (true) {
-                    toDestroy.clear();
-                    contentClaimManager.drainDestructableClaims(toDestroy, 
10000);
-                    if (toDestroy.isEmpty()) {
-                        return;
-                    }
-
-                    for (final ContentClaim claim : toDestroy) {
-                        final String container = claim.getContainer();
-                        final BlockingQueue<ContentClaim> claimQueue = 
reclaimable.get(container);
-
-                        try {
-                            while (true) {
-                                if (claimQueue.offer(claim, 10, 
TimeUnit.MINUTES)) {
-                                    break;
-                                } else {
-                                    LOG.warn("Failed to clean up {} because 
old claims aren't being cleaned up fast enough. "
-                                            + "This Content Claim will remain 
in the Content Repository until NiFi is restarted, at which point it will be 
cleaned up", claim);
-                                }
-                            }
-                        } catch (final InterruptedException ie) {
-                            LOG.warn("Failed to clean up {} because thread was 
interrupted", claim);
-                        }
-                    }
-                }
-            } catch (final Throwable t) {
-                LOG.error("Failed to cleanup content claims due to {}", t);
-            }
-        }
-    }
+    // private class BinDestructableClaims implements Runnable {
+    //
+    // @Override
+    // public void run() {
+    // try {
+    // // Get all of the Destructable Claims and bin them based on their 
Container. We do this
+    // // because the Container generally maps to a physical partition on the 
disk, so we want a few
+    // // different threads hitting the different partitions but don't want 
multiple threads hitting
+    // // the same partition.
+    // final List<ContentClaim> toDestroy = new ArrayList<>();
+    // while (true) {
+    // toDestroy.clear();
+    // contentClaimManager.drainDestructableClaims(toDestroy, 10000);
+    // if (toDestroy.isEmpty()) {
+    // return;
+    // }
+    //
+    // for (final ContentClaim claim : toDestroy) {
+    // final String container = claim.getContainer();
+    // final BlockingQueue<ContentClaim> claimQueue = 
reclaimable.get(container);
+    //
+    // try {
+    // while (true) {
+    // if (claimQueue.offer(claim, 10, TimeUnit.MINUTES)) {
+    // break;
+    // } else {
+    // LOG.warn("Failed to clean up {} because old claims aren't being cleaned 
up fast enough. "
+    // + "This Content Claim will remain in the Content Repository until NiFi 
is restarted, at which point it will be cleaned up", claim);
+    // }
+    // }
+    // } catch (final InterruptedException ie) {
+    // LOG.warn("Failed to clean up {} because thread was interrupted", claim);
+    // }
+    // }
+    // }
+    // } catch (final Throwable t) {
+    // LOG.error("Failed to cleanup content claims due to {}", t);
+    // }
+    // }
+    // }
 
     public static Path getArchivePath(final Path contentClaimPath) {
         final Path sectionPath = contentClaimPath.getParent();
@@ -1098,7 +1112,13 @@ public class FileSystemRepository implements 
ContentRepository {
         return oldestContainerArchive;
     }
 
+
     private class ArchiveOrDestroyDestructableClaims implements Runnable {
+        private final String containerName;
+
+        public ArchiveOrDestroyDestructableClaims(final String containerName) {
+            this.containerName = containerName;
+        }
 
         @Override
         public void run() {
@@ -1107,47 +1127,43 @@ public class FileSystemRepository implements 
ContentRepository {
                 while (true) {
                     // look through each of the binned queues of Content Claims
                     int successCount = 0;
-                    final List<ContentClaim> toRemove = new ArrayList<>();
-                    for (final Map.Entry<String, BlockingQueue<ContentClaim>> 
entry : reclaimable.entrySet()) {
-                        // drain the queue of all ContentClaims that can be 
destroyed for the given container.
-                        final String container = entry.getKey();
-                        final ContainerState containerState = 
containerStateMap.get(container);
-
-                        toRemove.clear();
-                        entry.getValue().drainTo(toRemove);
-                        if (toRemove.isEmpty()) {
-                            continue;
-                        }
+                    final List<ContentClaim> toRemove = new ArrayList<>(10);
+                    final ContainerState containerState = 
containerStateMap.get(containerName);
 
-                        // destroy each claim for this container
-                        final long start = System.nanoTime();
-                        for (final ContentClaim claim : toRemove) {
-                            if (archiveData) {
-                                try {
-                                    archive(claim);
-                                    containerState.incrementArchiveCount();
-                                    successCount++;
-                                } catch (final Exception e) {
-                                    LOG.warn("Failed to archive {} due to {}", 
claim, e.toString());
-                                    if (LOG.isDebugEnabled()) {
-                                        LOG.warn("", e);
-                                    }
-                                }
-                            } else {
-                                if (remove(claim)) {
-                                    successCount++;
+                    toRemove.clear();
+                    contentClaimManager.drainDestructableClaims(containerName, 
toRemove, 10);
+                    if (toRemove.isEmpty()) {
+                        return;
+                    }
+
+                    // destroy each claim for this container
+                    final long start = System.nanoTime();
+                    for (final ContentClaim claim : toRemove) {
+                        if (archiveData) {
+                            try {
+                                archive(claim);
+                                containerState.incrementArchiveCount();
+                                successCount++;
+                            } catch (final Exception e) {
+                                LOG.warn("Failed to archive {} due to {}", 
claim, e.toString());
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.warn("", e);
                                 }
                             }
+                        } else {
+                            if (remove(claim, false)) {
+                                successCount++;
+                            }
                         }
+                    }
 
-                        final long nanos = System.nanoTime() - start;
-                        final long millis = 
TimeUnit.NANOSECONDS.toMillis(nanos);
+                    final long nanos = System.nanoTime() - start;
+                    final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
 
-                        if (successCount == 0) {
-                            LOG.debug("No ContentClaims archived/removed for 
Container {}", container);
-                        } else {
-                            LOG.info("Successfully {} {} Content Claims for 
Container {} in {} millis", archiveData ? "archived" : "destroyed", 
successCount, container, millis);
-                        }
+                    if (successCount == 0) {
+                        LOG.debug("No ContentClaims archived/removed for 
Container {}", containerName);
+                    } else {
+                        LOG.info("Successfully {} {} Content Claims for 
Container {} in {} millis", archiveData ? "archived" : "destroyed", 
successCount, containerName, millis);
                     }
 
                     // if we didn't destroy anything, we're done.
@@ -1210,7 +1226,7 @@ public class FileSystemRepository implements 
ContentRepository {
         @Override
         public void run() {
             try {
-                if (oldestArchiveDate.get() > (System.currentTimeMillis() - 
maxArchiveMillis)) {
+                if (oldestArchiveDate.get() > System.currentTimeMillis() - 
maxArchiveMillis) {
                     final Long minRequiredSpace = 
minUsableContainerBytesForArchive.get(containerName);
                     if (minRequiredSpace == null) {
                         return;
@@ -1245,7 +1261,7 @@ public class FileSystemRepository implements 
ContentRepository {
                 if (oldestContainerArchive < 0L) {
                     boolean updated;
                     do {
-                        long oldest = oldestArchiveDate.get();
+                        final long oldest = oldestArchiveDate.get();
                         if (oldestContainerArchive < oldest) {
                             updated = oldestArchiveDate.compareAndSet(oldest, 
oldestContainerArchive);
 
@@ -1298,7 +1314,7 @@ public class FileSystemRepository implements 
ContentRepository {
                     final long free = getContainerUsableSpace(containerName);
                     used = capacity - free;
                     bytesUsed = used;
-                } catch (IOException e) {
+                } catch (final IOException e) {
                     return false;
                 }
             }
@@ -1317,7 +1333,7 @@ public class FileSystemRepository implements 
ContentRepository {
                     try {
                         LOG.info("Unable to write to container {} due to 
archive file size constraints; waiting for archive cleanup", containerName);
                         condition.await();
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                     }
                 }
             } finally {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
index 3bfdd8a..f9f5b68 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/VolatileContentRepository.java
@@ -627,7 +627,7 @@ public class VolatileContentRepository implements 
ContentRepository {
             final List<ContentClaim> destructable = new ArrayList<>(1000);
             while (true) {
                 destructable.clear();
-                claimManager.drainDestructableClaims(destructable, 1000, 5, 
TimeUnit.SECONDS);
+                claimManager.drainDestructableClaims(CONTAINER_NAME, 
destructable, 1000, 5, TimeUnit.SECONDS);
                 if (destructable.isEmpty()) {
                     return;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index f2df821..cb14fea 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -163,9 +163,11 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         return Files.getFileStore(flowFileRepositoryPath).getUsableSpace();
     }
 
+    private final AtomicLong updateCount = new AtomicLong(0L);
     @Override
     public void updateRepository(final Collection<RepositoryRecord> records) 
throws IOException {
-        updateRepository(records, alwaysSync);
+        final long updates = updateCount.incrementAndGet();
+        updateRepository(records, alwaysSync || updates % 100 == 0);
     }
 
     private void updateRepository(final Collection<RepositoryRecord> records, 
final boolean sync) throws IOException {
@@ -212,7 +214,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             BlockingQueue<ContentClaim> claimQueue = 
claimsAwaitingDestruction.get(partitionKey);
             if (claimQueue == null) {
                 claimQueue = new LinkedBlockingQueue<>();
-                BlockingQueue<ContentClaim> existingClaimQueue = 
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
+                final BlockingQueue<ContentClaim> existingClaimQueue = 
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
                 if (existingClaimQueue != null) {
                     claimQueue = existingClaimQueue;
                 }
@@ -339,7 +341,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                     final long start = System.nanoTime();
                     final int numRecordsCheckpointed = checkpoint();
                     final long end = System.nanoTime();
-                    final long millis = TimeUnit.MILLISECONDS.convert((end - 
start), TimeUnit.NANOSECONDS);
+                    final long millis = TimeUnit.MILLISECONDS.convert(end - 
start, TimeUnit.NANOSECONDS);
                     logger.info("Successfully checkpointed FlowFile Repository 
with {} records in {} milliseconds",
                             new Object[]{numRecordsCheckpointed, millis});
                 } catch (final IOException e) {
@@ -518,7 +520,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             }
 
             final StandardFlowFileRecord.Builder ffBuilder = new 
StandardFlowFileRecord.Builder();
-            RepositoryRecord record = currentRecordStates.get(recordId);
+            final RepositoryRecord record = currentRecordStates.get(recordId);
             ffBuilder.id(recordId);
             if (record != null) {
                 ffBuilder.fromFlowFile(record.getCurrent());
@@ -785,16 +787,16 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                 throw new EOFException();
             }
             if (firstValue == 0xff && secondValue == 0xff) {
-                int ch1 = in.read();
-                int ch2 = in.read();
-                int ch3 = in.read();
-                int ch4 = in.read();
+                final int ch1 = in.read();
+                final int ch2 = in.read();
+                final int ch3 = in.read();
+                final int ch4 = in.read();
                 if ((ch1 | ch2 | ch3 | ch4) < 0) {
                     throw new EOFException();
                 }
-                return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4));
+                return (ch1 << 24) + (ch2 << 16) + (ch3 << 8) + ch4;
             } else {
-                return ((firstValue << 8) + (secondValue));
+                return (firstValue << 8) + secondValue;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
index b68f95e..95f4c7f 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
@@ -32,7 +32,7 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
     private static final ConcurrentMap<ContentClaim, AtomicInteger> 
claimantCounts = new ConcurrentHashMap<>();
     private static final Logger logger = 
LoggerFactory.getLogger(StandardContentClaimManager.class);
 
-    private static final BlockingQueue<ContentClaim> destructableClaims = new 
LinkedBlockingQueue<>(50000);
+    private static final ConcurrentMap<String, BlockingQueue<ContentClaim>> 
destructableClaims = new ConcurrentHashMap<>();
 
     @Override
     public ContentClaim newContentClaim(final String container, final String 
section, final String id, final boolean lossTolerant) {
@@ -50,8 +50,8 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
         }
 
         counter = new AtomicInteger(0);
-        AtomicInteger existingCounter = claimantCounts.putIfAbsent(claim, 
counter);
-        return (existingCounter == null) ? counter : existingCounter;
+        final AtomicInteger existingCounter = 
claimantCounts.putIfAbsent(claim, counter);
+        return existingCounter == null ? counter : existingCounter;
     }
 
     @Override
@@ -60,7 +60,7 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
             return 0;
         }
         final AtomicInteger counter = claimantCounts.get(claim);
-        return (counter == null) ? 0 : counter.get();
+        return counter == null ? 0 : counter.get();
     }
 
     @Override
@@ -113,25 +113,41 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
 
         logger.debug("Marking claim {} as destructable", claim);
         try {
-            while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) {
+            final BlockingQueue<ContentClaim> destructableQueue = 
getDestructableClaimQueue(claim.getContainer());
+            while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) {
             }
         } catch (final InterruptedException ie) {
         }
     }
 
+    private BlockingQueue<ContentClaim> getDestructableClaimQueue(final String 
container) {
+        BlockingQueue<ContentClaim> claimQueue = 
destructableClaims.get(container);
+        if (claimQueue == null) {
+            claimQueue = new LinkedBlockingQueue<>(10000);
+            final BlockingQueue<ContentClaim> existing = 
destructableClaims.putIfAbsent(container, claimQueue);
+            if (existing != null) {
+                claimQueue = existing;
+            }
+        }
+
+        return claimQueue;
+    }
+
     @Override
-    public void drainDestructableClaims(final Collection<ContentClaim> 
destination, final int maxElements) {
-        final int drainedCount = destructableClaims.drainTo(destination, 
maxElements);
+    public void drainDestructableClaims(final String container, final 
Collection<ContentClaim> destination, final int maxElements) {
+        final BlockingQueue<ContentClaim> destructableQueue = 
getDestructableClaimQueue(container);
+        final int drainedCount = destructableQueue.drainTo(destination, 
maxElements);
         logger.debug("Drained {} destructable claims to {}", drainedCount, 
destination);
     }
 
     @Override
-    public void drainDestructableClaims(final Collection<ContentClaim> 
destination, final int maxElements, final long timeout, final TimeUnit unit) {
+    public void drainDestructableClaims(final String container, final 
Collection<ContentClaim> destination, final int maxElements, final long 
timeout, final TimeUnit unit) {
         try {
-            final ContentClaim firstClaim = destructableClaims.poll(timeout, 
unit);
+            final BlockingQueue<ContentClaim> destructableQueue = 
getDestructableClaimQueue(container);
+            final ContentClaim firstClaim = destructableQueue.poll(timeout, 
unit);
             if (firstClaim != null) {
                 destination.add(firstClaim);
-                destructableClaims.drainTo(destination, maxElements - 1);
+                destructableQueue.drainTo(destination, maxElements - 1);
             }
         } catch (final InterruptedException e) {
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fb0a1a76/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 364dcad..d5b1d52 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -90,11 +90,11 @@ public class TestFileSystemSwapManager {
         }
 
         @Override
-        public void drainDestructableClaims(Collection<ContentClaim> 
destination, int maxElements) {
+        public void drainDestructableClaims(String container, 
Collection<ContentClaim> destination, int maxElements) {
         }
 
         @Override
-        public void drainDestructableClaims(Collection<ContentClaim> 
destination, int maxElements, long timeout, TimeUnit unit) {
+        public void drainDestructableClaims(String container, 
Collection<ContentClaim> destination, int maxElements, long timeout, TimeUnit 
unit) {
         }
 
         @Override

Reply via email to