Repository: nifi
Updated Branches:
  refs/heads/master c68da68dc -> 2bb785300


NIFI-905: Ensure that when archive threshold is hit, archived data is destroyed 
and if no archived data exists that Processors aren't blocked from updating 
content repo


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3159cec7
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3159cec7
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3159cec7

Branch: refs/heads/master
Commit: 3159cec7825a4085898b64f29a0dc4cc7d803f17
Parents: 85cb5dd
Author: Mark Payne <[email protected]>
Authored: Fri Aug 28 14:15:35 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Fri Aug 28 14:15:35 2015 -0400

----------------------------------------------------------------------
 .../repository/FileSystemRepository.java        | 86 +++++++++++++-------
 1 file changed, 57 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3159cec7/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 18a3de1..d06b462 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -66,8 +66,8 @@ import 
org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
-import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.LongHolder;
 import org.apache.nifi.util.NiFiProperties;
@@ -1086,42 +1086,44 @@ public class FileSystemRepository implements 
ContentRepository {
         return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
     }
 
-    private void archive(final ResourceClaim claim) throws IOException {
+    private boolean archive(final ResourceClaim claim) throws IOException {
         if (!archiveData) {
-            return;
+            return false;
         }
 
         synchronized (writableClaimQueue) {
             final int claimantCount = claim == null ? 0 : 
resourceClaimManager.getClaimantCount(claim);
             if (claimantCount > 0 || writableClaimQueue.contains(new 
ClaimLengthPair(claim, null))) {
-                return;
+                return false;
             }
         }
 
         final Path curPath = getPath(claim);
         if (curPath == null) {
-            return;
+            return false;
         }
 
-        archive(curPath);
+        final boolean archived = archive(curPath);
         LOG.debug("Successfully moved {} to archive", claim);
+        return archived;
     }
 
-    private void archive(final Path curPath) throws IOException {
+    private boolean archive(final Path curPath) throws IOException {
         // check if already archived
         final boolean alreadyArchived = 
ARCHIVE_DIR_NAME.equals(curPath.getParent().toFile().getName());
         if (alreadyArchived) {
-            return;
+            return false;
         }
 
         final Path archivePath = getArchivePath(curPath);
         if (curPath.equals(archivePath)) {
             LOG.warn("Cannot archive {} because it is already archived", 
curPath);
-            return;
+            return false;
         }
 
         try {
             Files.move(curPath, archivePath);
+            return true;
         } catch (final NoSuchFileException nsfee) {
             // If the current path exists, try to create archive path and do 
the move again.
             // Otherwise, either the content was removed or has already been 
archived. Either way,
@@ -1134,7 +1136,10 @@ public class FileSystemRepository implements 
ContentRepository {
                 // for the existence of the directory continually.
                 Files.createDirectories(archivePath.getParent());
                 Files.move(curPath, archivePath);
+                return true;
             }
+
+            return false;
         }
     }
 
@@ -1159,6 +1164,19 @@ public class FileSystemRepository implements 
ContentRepository {
         return getLastModTime(file.toFile());
     }
 
+    private boolean deleteBasedOnTimestamp(final BlockingQueue<ArchiveInfo> 
fileQueue, final long removalTimeThreshold) throws IOException {
+        // check next file's last mod time.
+        final ArchiveInfo nextFile = fileQueue.peek();
+        if (nextFile == null) {
+            // Continue on to queue up the files, in case the next file must 
be destroyed based on time.
+            return false;
+        }
+
+        // If the last mod time indicates that it should be removed, just 
continue loop.
+        final long oldestArchiveDate = getLastModTime(nextFile.toPath());
+        return (oldestArchiveDate <= removalTimeThreshold);
+    }
+
     private long destroyExpiredArchives(final String containerName, final Path 
container) throws IOException {
         final List<ArchiveInfo> notYetExceedingThreshold = new ArrayList<>();
         final long removalTimeThreshold = System.currentTimeMillis() - 
maxArchiveMillis;
@@ -1177,37 +1195,46 @@ public class FileSystemRepository implements 
ContentRepository {
         final long startNanos = System.nanoTime();
         final long toFree = minRequiredSpace - usableSpace;
         final BlockingQueue<ArchiveInfo> fileQueue = 
archivedFiles.get(containerName);
+
         ArchiveInfo toDelete;
         int deleteCount = 0;
         long freed = 0L;
-        while ((toDelete = fileQueue.poll()) != null) {
+        while ((toDelete = fileQueue.peek()) != null) {
             try {
                 final long fileSize = toDelete.getSize();
-                Files.deleteIfExists(toDelete.toPath());
-                containerState.decrementArchiveCount();
-                LOG.debug("Deleted archived ContentClaim with ID {} from 
Container {} because the archival size was exceeding the max configured size", 
toDelete.getName(), containerName);
-                freed += fileSize;
-                deleteCount++;
+
+                // we use fileQueue.peek above instead of fileQueue.poll() 
because we don't always want to
+                // remove the head of the queue. Instead, we want to remove it 
only if we plan to delete it.
+                // In order to accomplish this, we just peek at the head and 
check if it should be deleted.
+                // If so, then we call poll() to remove it
+                if (freed < toFree || getLastModTime(toDelete.toPath()) < 
removalTimeThreshold) {
+                    toDelete = fileQueue.poll();   // remove the head of the 
queue, which is already stored in 'toDelete'
+                    Files.deleteIfExists(toDelete.toPath());
+                    containerState.decrementArchiveCount();
+                    LOG.debug("Deleted archived ContentClaim with ID {} from 
Container {} because the archival size was exceeding the max configured size", 
toDelete.getName(), containerName);
+                    freed += fileSize;
+                    deleteCount++;
+                }
 
                 // If we'd freed up enough space, we're done... unless the 
next file needs to be destroyed based on time.
                 if (freed >= toFree) {
-                    // check next file's last mod time.
-                    final ArchiveInfo nextFile = fileQueue.peek();
-                    if (nextFile == null) {
-                        // Continue on to queue up the files, in case the next 
file must be destroyed based on time.
-                        break;
-                    }
-
                     // If the last mod time indicates that it should be 
removed, just continue loop.
-                    final long oldestArchiveDate = 
getLastModTime(nextFile.toPath());
-                    if (oldestArchiveDate <= removalTimeThreshold) {
+                    if (deleteBasedOnTimestamp(fileQueue, 
removalTimeThreshold)) {
                         continue;
                     }
 
+                    final ArchiveInfo archiveInfo = fileQueue.peek();
+                    final long oldestArchiveDate = archiveInfo == null ? 
System.currentTimeMillis() : getLastModTime(archiveInfo.toPath());
+
                     // Otherwise, we're done. Return the last mod time of the 
oldest file in the container's archive.
                     final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
-                    LOG.info("Deleted {} files from archive for Container {}; 
oldest Archive Date is now {}; container cleanup took {} millis",
+                    if (deleteCount > 0) {
+                        LOG.info("Deleted {} files from archive for Container 
{}; oldest Archive Date is now {}; container cleanup took {} millis",
+                            deleteCount, containerName, new 
Date(oldestArchiveDate), millis);
+                    } else {
+                        LOG.debug("Deleted {} files from archive for Container 
{}; oldest Archive Date is now {}; container cleanup took {} millis",
                             deleteCount, containerName, new 
Date(oldestArchiveDate), millis);
+                    }
 
                     return oldestArchiveDate;
                 }
@@ -1354,9 +1381,10 @@ public class FileSystemRepository implements 
ContentRepository {
                         for (final ResourceClaim claim : toRemove) {
                             if (archiveData) {
                                 try {
-                                    archive(claim);
-                                    containerState.incrementArchiveCount();
-                                    successCount++;
+                                    if (archive(claim)) {
+                                        containerState.incrementArchiveCount();
+                                        successCount++;
+                                    }
                                 } catch (final Exception e) {
                                     LOG.warn("Failed to archive {} due to {}", 
claim, e.toString());
                                     if (LOG.isDebugEnabled()) {
@@ -1376,7 +1404,7 @@ public class FileSystemRepository implements 
ContentRepository {
                         if (successCount == 0) {
                             LOG.debug("No ContentClaims archived/removed for 
Container {}", container);
                         } else {
-                            LOG.info("Successfully {} {} Content Claims for 
Container {} in {} millis", archiveData ? "archived" : "destroyed", 
successCount, container, millis);
+                            LOG.info("Successfully {} {} Resource Claims for 
Container {} in {} millis", archiveData ? "archived" : "destroyed", 
successCount, container, millis);
                         }
                     }
 

Reply via email to