NIFI-731: Updated admin guide to explain the flowfile repo properties more; 
added new property for now often to 'sync' the flowfile repo; allowed number of 
partitions in flowfile repo to be changed after repo created. Minor tweaks to 
content repo and its communication with flowfile repo via ContentClaimManager


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bb5128be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bb5128be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bb5128be

Branch: refs/heads/NIFI-731
Commit: bb5128be2246dc48d5200e98572c47ff7cdfb905
Parents: fb0a1a7
Author: Mark Payne <marka...@hotmail.com>
Authored: Sun Jun 28 13:07:04 2015 -0400
Committer: Mark Payne <marka...@hotmail.com>
Committed: Sun Jun 28 13:36:43 2015 -0400

----------------------------------------------------------------------
 nifi/nifi-assembly/pom.xml                      |  3 +-
 .../org/apache/nifi/util/NiFiProperties.java    | 63 +++++++------
 .../org/wali/MinimalLockingWriteAheadLog.java   | 99 +++++++++++++++-----
 .../wali/TestMinimalLockingWriteAheadLog.java   | 97 ++++++++++++++++++-
 .../src/main/asciidoc/administration-guide.adoc | 25 ++++-
 .../apache/nifi/controller/FlowController.java  | 25 ++---
 .../repository/FileSystemRepository.java        |  2 +-
 .../WriteAheadFlowFileRepository.java           | 18 +++-
 .../claim/StandardContentClaimManager.java      | 21 ++++-
 .../repository/TestFileSystemRepository.java    | 12 ++-
 .../repository/TestStandardProcessSession.java  | 43 ++++-----
 .../TestVolatileContentRepository.java          |  5 +-
 .../TestWriteAheadFlowFileRepository.java       |  7 +-
 .../src/main/resources/conf/nifi.properties     |  1 +
 14 files changed, 311 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/nifi/nifi-assembly/pom.xml b/nifi/nifi-assembly/pom.xml
index 9b17617..ac39ca6 100644
--- a/nifi/nifi-assembly/pom.xml
+++ b/nifi/nifi-assembly/pom.xml
@@ -229,9 +229,10 @@ language governing permissions and limitations under the 
License. -->
 
         
<nifi.flowfile.repository.implementation>org.apache.nifi.controller.repository.WriteAheadFlowFileRepository</nifi.flowfile.repository.implementation>
         
<nifi.flowfile.repository.directory>./flowfile_repository</nifi.flowfile.repository.directory>
-        
<nifi.flowfile.repository.partitions>256</nifi.flowfile.repository.partitions>
+        
<nifi.flowfile.repository.partitions>16</nifi.flowfile.repository.partitions>
         <nifi.flowfile.repository.checkpoint.interval>2 
mins</nifi.flowfile.repository.checkpoint.interval>
         
<nifi.flowfile.repository.always.sync>false</nifi.flowfile.repository.always.sync>
+        
<nifi.flowfile.repository.updates.between.sync>128</nifi.flowfile.repository.updates.between.sync>
         
<nifi.swap.manager.implementation>org.apache.nifi.controller.FileSystemSwapManager</nifi.swap.manager.implementation>
         <nifi.queue.swap.threshold>20000</nifi.queue.swap.threshold>
         <nifi.swap.in.period>5 sec</nifi.swap.in.period>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
 
b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
index e25f5d6..6dadf12 100644
--- 
a/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
+++ 
b/nifi/nifi-commons/nifi-properties/src/main/java/org/apache/nifi/util/NiFiProperties.java
@@ -80,7 +80,8 @@ public class NiFiProperties extends Properties {
 
     // flowfile repository properties
     public static final String FLOWFILE_REPOSITORY_IMPLEMENTATION = 
"nifi.flowfile.repository.implementation";
-    public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = 
"nifi.flowfile.repository.always.sync";
+    public static final String FLOWFILE_REPOSITORY_ALWAYS_SYNC = 
"nifi.flowfile.repository.always.sync";  // deprecated
+    public static final String FLOWFILE_REPOSITORY_UPDATES_BETWEEN_SYNCS = 
"nifi.flowfile.repository.updates.between.sync";
     public static final String FLOWFILE_REPOSITORY_DIRECTORY = 
"nifi.flowfile.repository.directory";
     public static final String FLOWFILE_REPOSITORY_PARTITIONS = 
"nifi.flowfile.repository.partitions";
     public static final String FLOWFILE_REPOSITORY_CHECKPOINT_INTERVAL = 
"nifi.flowfile.repository.checkpoint.interval";
@@ -282,7 +283,7 @@ public class NiFiProperties extends Properties {
     public File getFlowConfigurationFile() {
         try {
             return new File(getProperty(FLOW_CONFIGURATION_FILE));
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             return null;
         }
     }
@@ -290,7 +291,7 @@ public class NiFiProperties extends Properties {
     public File getFlowConfigurationFileDir() {
         try {
             return getFlowConfigurationFile().getParentFile();
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             return null;
         }
     }
@@ -396,7 +397,7 @@ public class NiFiProperties extends Properties {
      */
     public Path getTemplateDirectory() {
         final String strVal = getProperty(TEMPLATE_DIRECTORY);
-        return (strVal == null) ? DEFAULT_TEMPLATE_DIRECTORY : 
Paths.get(strVal);
+        return strVal == null ? DEFAULT_TEMPLATE_DIRECTORY : Paths.get(strVal);
     }
 
     /**
@@ -474,7 +475,7 @@ public class NiFiProperties extends Properties {
      */
     public boolean getNeedClientAuth() {
         boolean needClientAuth = true;
-        String rawNeedClientAuth = getProperty(SECURITY_NEED_CLIENT_AUTH);
+        final String rawNeedClientAuth = 
getProperty(SECURITY_NEED_CLIENT_AUTH);
         if ("false".equalsIgnoreCase(rawNeedClientAuth)) {
             needClientAuth = false;
         }
@@ -488,7 +489,7 @@ public class NiFiProperties extends Properties {
 
     public boolean getSupportNewAccountRequests() {
         boolean shouldSupport = true;
-        String rawShouldSupport = 
getProperty(SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS);
+        final String rawShouldSupport = 
getProperty(SECURITY_SUPPORT_NEW_ACCOUNT_REQUESTS);
         if ("false".equalsIgnoreCase(rawShouldSupport)) {
             shouldSupport = false;
         }
@@ -500,7 +501,7 @@ public class NiFiProperties extends Properties {
         Integer port = null;
         try {
             port = Integer.parseInt(getProperty(WEB_HTTP_PORT));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
         }
         return port;
     }
@@ -509,7 +510,7 @@ public class NiFiProperties extends Properties {
         Integer sslPort = null;
         try {
             sslPort = Integer.parseInt(getProperty(WEB_HTTPS_PORT));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
         }
         return sslPort;
     }
@@ -540,15 +541,15 @@ public class NiFiProperties extends Properties {
 
     public List<Path> getNarLibraryDirectories() {
 
-        List<Path> narLibraryPaths = new ArrayList<>();
+        final List<Path> narLibraryPaths = new ArrayList<>();
 
         // go through each property
-        for (String propertyName : stringPropertyNames()) {
+        for (final String propertyName : stringPropertyNames()) {
             // determine if the property is a nar library path
             if (StringUtils.startsWith(propertyName, 
NAR_LIBRARY_DIRECTORY_PREFIX)
                     || NAR_LIBRARY_DIRECTORY.equals(propertyName)) {
                 // attempt to resolve the path specified
-                String narLib = getProperty(propertyName);
+                final String narLib = getProperty(propertyName);
                 if (!StringUtils.isBlank(narLib)) {
                     narLibraryPaths.add(Paths.get(narLib));
                 }
@@ -615,10 +616,10 @@ public class NiFiProperties extends Properties {
 
     public InetSocketAddress getClusterProtocolMulticastAddress() {
         try {
-            String multicastAddress = 
getProperty(CLUSTER_PROTOCOL_MULTICAST_ADDRESS);
-            int multicastPort = 
Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_PORT));
+            final String multicastAddress = 
getProperty(CLUSTER_PROTOCOL_MULTICAST_ADDRESS);
+            final int multicastPort = 
Integer.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_PORT));
             return new InetSocketAddress(multicastAddress, multicastPort);
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             throw new RuntimeException("Invalid multicast address/port due to: 
" + ex, ex);
         }
     }
@@ -641,7 +642,7 @@ public class NiFiProperties extends Properties {
         try {
             return Integer
                     
.parseInt(getProperty(CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
             return DEFAULT_CLUSTER_PROTOCOL_MULTICAST_SERVICE_LOCATOR_ATTEMPTS;
         }
     }
@@ -662,9 +663,9 @@ public class NiFiProperties extends Properties {
             if (StringUtils.isBlank(socketAddress)) {
                 socketAddress = "localhost";
             }
-            int socketPort = getClusterNodeProtocolPort();
+            final int socketPort = getClusterNodeProtocolPort();
             return InetSocketAddress.createUnresolved(socketAddress, 
socketPort);
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             throw new RuntimeException("Invalid node protocol address/port due 
to: " + ex, ex);
         }
     }
@@ -672,7 +673,7 @@ public class NiFiProperties extends Properties {
     public Integer getClusterNodeProtocolPort() {
         try {
             return Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_PORT));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
             return null;
         }
     }
@@ -680,7 +681,7 @@ public class NiFiProperties extends Properties {
     public int getClusterNodeProtocolThreads() {
         try {
             return 
Integer.parseInt(getProperty(CLUSTER_NODE_PROTOCOL_THREADS));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
             return DEFAULT_CLUSTER_NODE_PROTOCOL_THREADS;
         }
     }
@@ -691,10 +692,10 @@ public class NiFiProperties extends Properties {
             if (StringUtils.isBlank(socketAddress)) {
                 socketAddress = "localhost";
             }
-            int socketPort = Integer
+            final int socketPort = Integer
                     
.parseInt(getProperty(CLUSTER_NODE_UNICAST_MANAGER_PROTOCOL_PORT));
             return InetSocketAddress.createUnresolved(socketAddress, 
socketPort);
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             throw new RuntimeException("Invalid unicast manager address/port 
due to: " + ex, ex);
         }
     }
@@ -710,9 +711,9 @@ public class NiFiProperties extends Properties {
             if (StringUtils.isBlank(socketAddress)) {
                 socketAddress = "localhost";
             }
-            int socketPort = getClusterManagerProtocolPort();
+            final int socketPort = getClusterManagerProtocolPort();
             return InetSocketAddress.createUnresolved(socketAddress, 
socketPort);
-        } catch (Exception ex) {
+        } catch (final Exception ex) {
             throw new RuntimeException("Invalid manager protocol address/port 
due to: " + ex, ex);
         }
     }
@@ -720,7 +721,7 @@ public class NiFiProperties extends Properties {
     public Integer getClusterManagerProtocolPort() {
         try {
             return 
Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_PORT));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
             return null;
         }
     }
@@ -737,7 +738,7 @@ public class NiFiProperties extends Properties {
     public int getClusterManagerNodeEventHistorySize() {
         try {
             return 
Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
             return DEFAULT_CLUSTER_MANAGER_NODE_EVENT_HISTORY_SIZE;
         }
     }
@@ -755,7 +756,7 @@ public class NiFiProperties extends Properties {
     public int getClusterManagerNodeApiRequestThreads() {
         try {
             return 
Integer.parseInt(getProperty(CLUSTER_MANAGER_NODE_API_REQUEST_THREADS));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
             return DEFAULT_CLUSTER_MANAGER_NODE_API_NUM_REQUEST_THREADS;
         }
     }
@@ -768,7 +769,7 @@ public class NiFiProperties extends Properties {
     public int getClusterManagerProtocolThreads() {
         try {
             return 
Integer.parseInt(getProperty(CLUSTER_MANAGER_PROTOCOL_THREADS));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
             return DEFAULT_CLUSTER_MANAGER_PROTOCOL_THREADS;
         }
     }
@@ -790,7 +791,7 @@ public class NiFiProperties extends Properties {
     public InetSocketAddress getNodeApiAddress() {
 
         final String rawScheme = getClusterProtocolManagerToNodeApiScheme();
-        final String scheme = (rawScheme == null) ? "http" : rawScheme;
+        final String scheme = rawScheme == null ? "http" : rawScheme;
 
         final String host;
         final Integer port;
@@ -861,7 +862,7 @@ public class NiFiProperties extends Properties {
         final Map<String, Path> contentRepositoryPaths = new HashMap<>();
 
         // go through each property
-        for (String propertyName : stringPropertyNames()) {
+        for (final String propertyName : stringPropertyNames()) {
             // determine if the property is a file repository path
             if (StringUtils.startsWith(propertyName, 
REPOSITORY_CONTENT_PREFIX)) {
                 // get the repository key
@@ -887,7 +888,7 @@ public class NiFiProperties extends Properties {
         final Map<String, Path> provenanceRepositoryPaths = new HashMap<>();
 
         // go through each property
-        for (String propertyName : stringPropertyNames()) {
+        for (final String propertyName : stringPropertyNames()) {
             // determine if the property is a file repository path
             if (StringUtils.startsWith(propertyName, 
PROVENANCE_REPO_DIRECTORY_PREFIX)) {
                 // get the repository key
@@ -904,7 +905,7 @@ public class NiFiProperties extends Properties {
     public int getMaxFlowFilesPerClaim() {
         try {
             return Integer.parseInt(getProperty(MAX_FLOWFILES_PER_CLAIM));
-        } catch (NumberFormatException nfe) {
+        } catch (final NumberFormatException nfe) {
             return DEFAULT_MAX_FLOWFILES_PER_CLAIM;
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
 
b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
index 5c8b4c8..4331504 100644
--- 
a/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
+++ 
b/nifi/nifi-commons/nifi-write-ahead-log/src/main/java/org/wali/MinimalLockingWriteAheadLog.java
@@ -60,7 +60,6 @@ import java.util.regex.Pattern;
 
 import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.stream.io.BufferedOutputStream;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,6 +79,7 @@ public final class MinimalLockingWriteAheadLog<T> implements 
WriteAheadRepositor
     private final Path partialPath;
     private final Path snapshotPath;
 
+    private final SortedSet<Path> basePaths;
     private final SerDe<T> serde;
     private final SyncListener syncListener;
     private final FileChannel lockChannel;
@@ -120,9 +120,9 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
      * @param syncListener the listener
      * @throws IOException if unable to initialize due to IO issue
      */
-    @SuppressWarnings("unchecked")
     public MinimalLockingWriteAheadLog(final SortedSet<Path> paths, final int 
partitionCount, final SerDe<T> serde, final SyncListener syncListener) throws 
IOException {
         this.syncListener = syncListener;
+        this.basePaths = paths;
 
         requireNonNull(paths);
         requireNonNull(serde);
@@ -131,7 +131,6 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
             throw new IllegalArgumentException("Paths must be non-empty");
         }
 
-        int existingPartitions = 0;
         for (final Path path : paths) {
             if (!Files.exists(path)) {
                 Files.createDirectories(path);
@@ -150,21 +149,6 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
             if (!file.canExecute()) {
                 throw new IOException("Path given [" + path + "] is not 
executable");
             }
-
-            final File[] children = file.listFiles();
-            if (children != null) {
-                for (final File child : children) {
-                    if (child.isDirectory() && 
child.getName().startsWith("partition-")) {
-                        existingPartitions++;
-                    }
-                }
-
-                if (existingPartitions != 0 && existingPartitions != 
partitionCount) {
-                    logger.warn("Constructing MinimalLockingWriteAheadLog with 
partitionCount={}, but the repository currently has "
-                            + "{} partitions; ignoring argument and proceeding 
with {} partitions",
-                            new Object[]{partitionCount, existingPartitions, 
existingPartitions});
-                }
-            }
         }
 
         this.basePath = paths.iterator().next();
@@ -176,19 +160,26 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         lockChannel = new FileOutputStream(lockPath.toFile()).getChannel();
         lockChannel.lock();
 
-        partitions = new Partition[partitionCount];
+        partitions = createPartitions(partitionCount);
+    }
 
-        Iterator<Path> pathIterator = paths.iterator();
-        for (int i = 0; i < partitionCount; i++) {
+    private Partition<T>[] createPartitions(final int count) throws 
IOException {
+        @SuppressWarnings("unchecked")
+        final Partition<T>[] partitions = new Partition[count];
+
+        Iterator<Path> pathIterator = basePaths.iterator();
+        for (int i = 0; i < count; i++) {
             // If we're out of paths, create a new iterator to start over.
             if (!pathIterator.hasNext()) {
-                pathIterator = paths.iterator();
+                pathIterator = basePaths.iterator();
             }
 
             final Path partitionBasePath = pathIterator.next();
 
             partitions[i] = new 
Partition<>(partitionBasePath.resolve("partition-" + i), serde, i, 
getVersion());
         }
+
+        return partitions;
     }
 
     @Override
@@ -284,11 +275,30 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
             throw new IllegalStateException("Cannot recover records after 
updating the repository; must call recoverRecords first");
         }
 
+        int existingPartitionCount = 0;
         final long recoverStart = System.nanoTime();
         writeLock.lock();
         try {
+
             Long maxTransactionId = recoverFromSnapshot(recordMap);
-            recoverFromEdits(recordMap, maxTransactionId);
+
+            for (final Path basePath : basePaths) {
+                final File file = basePath.toFile();
+                final File[] children = file.listFiles();
+                if (children != null) {
+                    for (final File child : children) {
+                        if (child.isDirectory() && 
child.getName().startsWith("partition-")) {
+                            existingPartitionCount++;
+                        }
+                    }
+                }
+            }
+
+            final Partition<T>[] recoverablePartitions = 
createPartitions(existingPartitionCount);
+            recoverFromEdits(recordMap, recoverablePartitions, 
maxTransactionId);
+            for (final Partition<T> partition : recoverablePartitions) {
+                partition.close();
+            }
 
             for (final Partition<T> partition : partitions) {
                 final long transId = partition.getMaxRecoveredTransactionId();
@@ -308,10 +318,48 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
         logger.info("Successfully recovered {} records in {} milliseconds", 
recordMap.size(), recoveryMillis);
         checkpoint();
 
+        // If there are any partitions that have an index higher than the 
current desired number
+        // of partitions, delete the partition.
+        for (final Path basePath : basePaths) {
+            final File file = basePath.toFile();
+            final File[] children = file.listFiles();
+            if (children != null) {
+                for (final File child : children) {
+                    if (child.isDirectory() && 
child.getName().startsWith("partition-")) {
+                        final String indexName = child.getName().substring(10);
+                        final int index;
+                        try {
+                            index = Integer.parseInt(indexName);
+                        } catch (final NumberFormatException nfe) {
+                            // not a valid partition; ignore it and more on
+                            continue;
+                        }
+
+                        if (index >= partitions.length) {
+                            deleteRecursively(child);
+                        }
+                    }
+                }
+            }
+        }
+
         recovered = true;
         return recordMap.values();
     }
 
+    private void deleteRecursively(final File file) {
+        if (file.isDirectory()) {
+            final File[] children = file.listFiles();
+            if (children != null) {
+                for (final File child : children) {
+                    deleteRecursively(child);
+                }
+            }
+        }
+
+        file.delete();
+    }
+
     @Override
     public Set<String> getRecoveredSwapLocations() throws IOException {
         return recoveredExternalLocations;
@@ -398,10 +446,11 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
      * before modification.
      *
      * @param modifiableRecordMap map
+     * @param partitions the partitions to use to recover records from
      * @param maxTransactionIdRestored index of max restored transaction
      * @throws IOException if unable to recover from edits
      */
-    private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, 
final Long maxTransactionIdRestored) throws IOException {
+    private void recoverFromEdits(final Map<Object, T> modifiableRecordMap, 
final Partition<T>[] partitions, final Long maxTransactionIdRestored) throws 
IOException {
         final Map<Object, T> updateMap = new HashMap<>();
         final Map<Object, T> unmodifiableRecordMap = 
Collections.unmodifiableMap(modifiableRecordMap);
         final Map<Object, T> ignorableMap = new HashMap<>();
@@ -918,7 +967,7 @@ public final class MinimalLockingWriteAheadLog<T> 
implements WriteAheadRepositor
 
         public Long getNextRecoverableTransactionId() throws IOException {
             while (true) {
-                DataInputStream recoveryStream = getRecoveryStream();
+                final DataInputStream recoveryStream = getRecoveryStream();
                 if (recoveryStream == null) {
                     return null;
                 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
 
b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
index 57f3495..f5971c6 100644
--- 
a/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
+++ 
b/nifi/nifi-commons/nifi-write-ahead-log/src/test/java/org/wali/TestMinimalLockingWriteAheadLog.java
@@ -28,6 +28,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.SortedSet;
@@ -149,6 +150,98 @@ public class TestMinimalLockingWriteAheadLog {
     }
 
     @Test
+    public void testDecreasingPartitionCount() throws IOException {
+        final Path path = Paths.get("target/test-decrease-partition-count");
+        deleteRecursively(path.toFile());
+        assertTrue(path.toFile().mkdirs());
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, 64, serde, null);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        for (int i = 0; i < 64; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+            repo.update(Collections.singleton(record), false);
+        }
+
+        repo.shutdown();
+
+        for (int i = 0; i < 64; i++) {
+            final String partName = "partition-" + i;
+            final Path partitionPath = path.resolve(partName);
+            assertTrue(Files.exists(partitionPath));
+        }
+
+        // create a new repo with only 8 partitions, verify that the other 
partitions are deleted, but that all
+        // records still exist. We do this twice to ensure that after we 
checkpoint with only 8 partitions, we
+        // still recover all 64 records again the next time.
+        for (int recoverIndex = 0; recoverIndex < 2; recoverIndex++) {
+            final WriteAheadRepository<DummyRecord> recoverRepo = new 
MinimalLockingWriteAheadLog<>(path, 8, serde, null);
+            final Collection<DummyRecord> recovered = 
recoverRepo.recoverRecords();
+            assertEquals(64, recovered.size());
+
+            assertTrue(Files.exists(path));
+
+            for (int i = 0; i < 8; i++) {
+                final String partName = "partition-" + i;
+                final Path partitionPath = path.resolve(partName);
+                assertTrue(Files.exists(partitionPath));
+            }
+            for (int i = 8; i < 64; i++) {
+                final String partName = "partition-" + i;
+                final Path partitionPath = path.resolve(partName);
+                assertFalse(Files.exists(partitionPath));
+            }
+
+            recoverRepo.shutdown();
+        }
+    }
+
+    @Test
+    public void testIncreasingPartitionCount() throws IOException {
+        final Path path = Paths.get("target/test-decrease-partition-count");
+        deleteRecursively(path.toFile());
+        assertTrue(path.toFile().mkdirs());
+
+        final DummyRecordSerde serde = new DummyRecordSerde();
+        final WriteAheadRepository<DummyRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, 8, serde, null);
+        final Collection<DummyRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        for (int i = 0; i < 64; i++) {
+            final DummyRecord record = new DummyRecord(String.valueOf(i), 
UpdateType.CREATE);
+            repo.update(Collections.singleton(record), false);
+        }
+
+        repo.shutdown();
+
+        for (int i = 0; i < 8; i++) {
+            final String partName = "partition-" + i;
+            final Path partitionPath = path.resolve(partName);
+            assertTrue(Files.exists(partitionPath));
+        }
+
+        // create a new repo with 64 partitions. We do this twice to ensure 
that after we checkpoint with 64 partitions, we
+        // still recover all 64 records again the next time.
+        for (int recoverIndex = 0; recoverIndex < 2; recoverIndex++) {
+            final WriteAheadRepository<DummyRecord> recoverRepo = new 
MinimalLockingWriteAheadLog<>(path, 64, serde, null);
+            final Collection<DummyRecord> recovered = 
recoverRepo.recoverRecords();
+            assertEquals(64, recovered.size());
+
+            assertTrue(Files.exists(path));
+
+            for (int i = 0; i < 64; i++) {
+                final String partName = "partition-" + i;
+                final Path partitionPath = path.resolve(partName);
+                assertTrue(Files.exists(partitionPath));
+            }
+
+            recoverRepo.shutdown();
+        }
+    }
+
+    @Test
     public void testCannotModifyLogAfterAllAreBlackListed() throws IOException 
{
         final int numPartitions = 5;
         final Path path = 
Paths.get("target/minimal-locking-repo-test-cannot-modify-after-all-blacklisted");
@@ -275,10 +368,10 @@ public class TestMinimalLockingWriteAheadLog {
             try {
                 int counter = 0;
                 for (final List<DummyRecord> list : records) {
-                    final boolean forceSync = (++counter == records.size());
+                    final boolean forceSync = ++counter == records.size();
                     repo.update(list, forceSync);
                 }
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 Assert.fail("Failed to update: " + e.toString());
                 e.printStackTrace();
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc 
b/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc
index 5535c35..8c14f20 100644
--- a/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -433,9 +433,30 @@ to configure it on a separate drive if available.
 |*Property*|*Description*
 |nifi.flowfile.repository.implementation|The FlowFile Repository 
implementation. The default value is 
org.apache.nifi.controller.repository.WriteAheadFlowFileRepository and should 
not be changed.
 |nifi.flowfile.repository.directory*|The location of the FlowFile Repository. 
The default value is ./flowfile_repository.
-|nifi.flowfile.repository.partitions|The number of partitions. The default 
value is 256.
+|nifi.flowfile.repository.partitions|The number of partitions. This number 
indicates the maximum number of threads that are allowed to
+       update the FlowFile Repository simultaneously. However, each of these 
partitions must periodically be synchronized to the
+       underlying disk, which is very expensive. As a result, when this 
happens, the more partitions that exist, the more expensive
+       this synchronization is. Until version 0.2.0-incubating, the default 
value was 256. Starting at version 0.2.0-incubating, the default
+       value was changed to 32.
 |nifi.flowfile.repository.checkpoint.interval| The FlowFile Repository 
checkpoint interval. The default value is 2 mins.
-|nifi.flowfile.repository.always.sync|If set to _true_, any change to the 
repository will be synchronized to the disk, meaning that NiFi will ask the 
operating system not to cache the information. This is very expensive and can 
significantly reduce NiFi performance. However, if it is _false_, there could 
be the potential for data loss if either there is a sudden power loss or the 
operating system crashes. The default value is _false_.
+|nifi.flowfile.repository.always.sync|_*Deprecated*: Use of this property is 
deprecated as of NiFi 0.2.0-incubating. This property will be
+       removed in a later version of NiFi. In order to gain the equivalent 
functionality, set the value of the 
`nifi.flowfile.repository.updates.between.sync`
+       property to `1`._
+       If set to _true_, any change to the repository will be synchronized to 
the disk, 
+       meaning that NiFi will ask the operating system not to cache the 
information. This is very expensive and can significantly 
+       reduce NiFi performance. However, if it is _false_, there could be the 
potential for data loss if either there is a sudden 
+       power loss or the operating system crashes. The default value is 
_false_.
+|nifi.flowfile.repository.updates.between.sync|Specifies the number of times 
that the repository can be updated before a partition is 
+       synchronized with disk. A value of `0` indicates that it should never 
been synchronized on update. A value of `1` indicates that it should
+       be synchronized with each update. Very small values for this property 
may result in reduced overall
+       performance due to excessive disk utilization, as it causes the system 
to bypass the Operating System's caching mechanisms. 
+       However, setting the value too large can result in sporadic 
performance. This is due to the fact that in order to prevent data loss,
+       the Content Repository cannot destroy content until the FlowFile 
Repository has performed a 'sync'. As a result, if this value is
+       very large (or set to 0), this will happen only when the repository is 
checkpointed. Consequently, each time that the repository is
+       checkpointed, the Content Repository is forced to destroy a potentially 
very large number of files, which drastically reduces overall
+       disk performance. Additionally, if the number of files is very large, 
the system will be forced to block while these deletions take
+       place in order to avoid depleting all of the system's resources. This 
property was introduced in version
+       0.2.0-incubating. The default value for this property is 128.
 |====
 
 *Swap Management* +

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2ffdd4e..8144373 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -280,7 +280,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     private final NodeProtocolSender protocolSender;
 
     private final ScheduledExecutorService clusterTaskExecutor = new 
FlowEngine(3, "Clustering Tasks");
-    private final ContentClaimManager contentClaimManager = new 
StandardContentClaimManager();
+    private final ContentClaimManager contentClaimManager;
 
     // guarded by rwLock
     /**
@@ -389,14 +389,15 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         timerDrivenEngineRef = new AtomicReference<>(new 
FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
         eventDrivenEngineRef = new AtomicReference<>(new 
FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
 
+        bulletinRepository = new VolatileBulletinRepository();
+        nodeBulletinSubscriber = new AtomicReference<>();
+
+        contentClaimManager = new 
StandardContentClaimManager(createEventReporter(bulletinRepository));
         final FlowFileRepository flowFileRepo = 
createFlowFileRepository(properties, contentClaimManager);
         flowFileRepository = flowFileRepo;
         flowFileEventRepository = flowFileEventRepo;
         counterRepositoryRef = new AtomicReference<CounterRepository>(new 
StandardCounterRepository());
 
-        bulletinRepository = new VolatileBulletinRepository();
-        nodeBulletinSubscriber = new AtomicReference<>();
-
         try {
             this.provenanceEventRepository = 
createProvenanceRepository(properties);
             
this.provenanceEventRepository.initialize(createEventReporter(bulletinRepository));
@@ -593,7 +594,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         writeLock.lock();
         try {
             if (startDelayedComponents) {
-                LOG.info("Starting {} processors/ports/funnels", 
(startConnectablesAfterInitialization.size() + 
startRemoteGroupPortsAfterInitialization.size()));
+                LOG.info("Starting {} processors/ports/funnels", 
startConnectablesAfterInitialization.size() + 
startRemoteGroupPortsAfterInitialization.size());
                 for (final Connectable connectable : 
startConnectablesAfterInitialization) {
                     if (connectable.getScheduledState() == 
ScheduledState.DISABLED) {
                         continue;
@@ -1012,7 +1013,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
     public boolean isTerminated() {
         this.readLock.lock();
         try {
-            return (null == this.timerDrivenEngineRef.get() || 
this.timerDrivenEngineRef.get().isTerminated());
+            return null == this.timerDrivenEngineRef.get() || 
this.timerDrivenEngineRef.get().isTerminated();
         } finally {
             this.readLock.unlock();
         }
@@ -1828,9 +1829,9 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         } else if (id1.equals(id2)) {
             return true;
         } else {
-            final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id1);
-            final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id2);
-            return (comparable1.equals(comparable2));
+            final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id1;
+            final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id2;
+            return comparable1.equals(comparable2);
         }
     }
 
@@ -1964,7 +1965,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
         }
 
         final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? 
getRootGroupId() : id;
-        return (root == null) ? null : root.findProcessGroup(searchId);
+        return root == null ? null : root.findProcessGroup(searchId);
     }
 
     @Override
@@ -2079,8 +2080,8 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                 
connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
                 
connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
 
-                flowFilesTransferred += 
(connectionStatusReport.getFlowFilesIn() + 
connectionStatusReport.getFlowFilesOut());
-                bytesTransferred += (connectionStatusReport.getContentSizeIn() 
+ connectionStatusReport.getContentSizeOut());
+                flowFilesTransferred += 
connectionStatusReport.getFlowFilesIn() + 
connectionStatusReport.getFlowFilesOut();
+                bytesTransferred += connectionStatusReport.getContentSizeIn() 
+ connectionStatusReport.getContentSizeOut();
             }
 
             if (StringUtils.isNotBlank(conn.getName())) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 4e4609a..864058c 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -539,7 +539,7 @@ public class FileSystemRepository implements 
ContentRepository {
 
         Path path = null;
         try {
-            path = getPath(claim, false);
+            path = getPath(claim, false, checkArchive);
         } catch (final ContentNotFoundException cnfe) {
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index cb14fea..668adb8 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -82,6 +82,8 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     private final Path flowFileRepositoryPath;
     private final int numPartitions;
     private final ScheduledExecutorService checkpointExecutor;
+    private final AtomicLong updateCount = new AtomicLong(0L);
+    private final int updatesBetweenSyncs;
 
     // effectively final
     private WriteAheadRepository<RepositoryRecord> wal;
@@ -120,7 +122,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         flowFileRepositoryPath = properties.getFlowFileRepositoryPath();
         numPartitions = properties.getFlowFileRepositoryPartitions();
         checkpointDelayMillis = 
FormatUtils.getTimeDuration(properties.getFlowFileRepositoryCheckpointInterval(),
 TimeUnit.MILLISECONDS);
-
+        updatesBetweenSyncs = 
properties.getIntegerProperty(NiFiProperties.FLOWFILE_REPOSITORY_UPDATES_BETWEEN_SYNCS,
 128);
         checkpointExecutor = Executors.newSingleThreadScheduledExecutor();
     }
 
@@ -163,11 +165,10 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         return Files.getFileStore(flowFileRepositoryPath).getUsableSpace();
     }
 
-    private final AtomicLong updateCount = new AtomicLong(0L);
     @Override
     public void updateRepository(final Collection<RepositoryRecord> records) 
throws IOException {
         final long updates = updateCount.incrementAndGet();
-        updateRepository(records, alwaysSync || updates % 100 == 0);
+        updateRepository(records, alwaysSync || updatesBetweenSyncs > 0 && 
updates % updatesBetweenSyncs == 0);
     }
 
     private void updateRepository(final Collection<RepositoryRecord> records, 
final boolean sync) throws IOException {
@@ -241,18 +242,27 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         for (final ContentClaim claim : claimsToDestroy) {
             markDestructable(claim);
         }
+
+        logger.info("Marked {} Content Claims as destructable for FlowFile 
Repository partition {}", claimsToDestroy.size(), partitionIndex);
     }
 
     @Override
     public void onGlobalSync() {
-        for (final BlockingQueue<ContentClaim> claimQueue : 
claimsAwaitingDestruction.values()) {
+        int destroyed = 0;
+        for (final Map.Entry<Integer, BlockingQueue<ContentClaim>> entry : 
claimsAwaitingDestruction.entrySet()) {
+            final Integer partitionIndex = entry.getKey();
+            final BlockingQueue<ContentClaim> claimQueue = entry.getValue();
             final Set<ContentClaim> claimsToDestroy = new HashSet<>();
             claimQueue.drainTo(claimsToDestroy);
+            destroyed += claimsToDestroy.size();
 
             for (final ContentClaim claim : claimsToDestroy) {
                 markDestructable(claim);
             }
+            logger.debug("As part of checkpointing FlowFile Repository, marked 
{} claims as destructable for partition {}", claimsToDestroy.size(), 
partitionIndex);
         }
+
+        logger.info("Marked {} claims as destructable as a result of 
checkpointing FlowFile repository", destroyed);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
index 95f4c7f..f5494f9 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimManager.java
@@ -24,6 +24,8 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.nifi.events.EventReporter;
+import org.apache.nifi.reporting.Severity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,6 +35,11 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardContentClaimManager.class);
 
     private static final ConcurrentMap<String, BlockingQueue<ContentClaim>> 
destructableClaims = new ConcurrentHashMap<>();
+    private final EventReporter eventReporter;
+
+    public StandardContentClaimManager(final EventReporter eventReporter) {
+        this.eventReporter = eventReporter;
+    }
 
     @Override
     public ContentClaim newContentClaim(final String container, final String 
section, final String id, final boolean lossTolerant) {
@@ -114,7 +121,19 @@ public class StandardContentClaimManager implements 
ContentClaimManager {
         logger.debug("Marking claim {} as destructable", claim);
         try {
             final BlockingQueue<ContentClaim> destructableQueue = 
getDestructableClaimQueue(claim.getContainer());
-            while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) {
+            final boolean accepted = destructableQueue.offer(claim);
+            if (!accepted) {
+                final long start = System.nanoTime();
+
+                while (!destructableQueue.offer(claim, 30, TimeUnit.MINUTES)) {
+                }
+
+                final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+                if (millis > 10L) {
+                    logger.warn("Total wait duration to add claim to 
Destructable Claim Queue was {} millis", millis);
+                    eventReporter.reportEvent(Severity.WARNING, "Content 
Repository", "The Content Repository is unable to destroy content as fast "
+                        + "as it is being created. The flow will be slowed in 
order to adjust for this.");
+                }
             }
         } catch (final InterruptedException ie) {
         }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index ada0775..b5cfcb6 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository;
 
 import org.apache.nifi.controller.repository.FileSystemRepository;
 import org.apache.nifi.controller.repository.ContentNotFoundException;
+
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -41,10 +42,11 @@ import java.util.List;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
 import org.apache.nifi.controller.repository.util.DiskUtils;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.util.NiFiProperties;
-
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestFileSystemRepository {
 
@@ -63,7 +65,7 @@ public class TestFileSystemRepository {
         }
 
         repository = new FileSystemRepository();
-        repository.initialize(new StandardContentClaimManager());
+        repository.initialize(new 
StandardContentClaimManager(Mockito.mock(EventReporter.class)));
         repository.purge();
     }
 
@@ -314,9 +316,9 @@ public class TestFileSystemRepository {
         }
 
         final ContentClaim destination = repository.create(true);
-        final byte[] headerBytes = (header == null) ? null : header.getBytes();
-        final byte[] footerBytes = (footer == null) ? null : footer.getBytes();
-        final byte[] demarcatorBytes = (demarcator == null) ? null : 
demarcator.getBytes();
+        final byte[] headerBytes = header == null ? null : header.getBytes();
+        final byte[] footerBytes = footer == null ? null : footer.getBytes();
+        final byte[] demarcatorBytes = demarcator == null ? null : 
demarcator.getBytes();
         repository.merge(claims, destination, headerBytes, footerBytes, 
demarcatorBytes);
 
         final StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 3486875..9441237 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -53,6 +53,7 @@ import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.groups.ProcessGroup;
 import org.apache.nifi.processor.Relationship;
@@ -179,7 +180,7 @@ public class TestStandardProcessSession {
         when(connectable.getConnections()).thenReturn(new HashSet<>(connList));
 
         contentRepo = new MockContentRepository();
-        contentRepo.initialize(new StandardContentClaimManager());
+        contentRepo.initialize(new 
StandardContentClaimManager(Mockito.mock(EventReporter.class)));
         flowFileRepo = new MockFlowFileRepository();
 
         final ProcessContext context = new ProcessContext(connectable, new 
AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, 
provenanceRepo);
@@ -321,7 +322,7 @@ public class TestStandardProcessSession {
                 .entryDate(System.currentTimeMillis())
                 .build();
         flowFileQueue.put(flowFileRecord);
-        FlowFile flowFile = session.get();
+        final FlowFile flowFile = session.get();
         assertNotNull(flowFile);
         final ObjectHolder<InputStream> inputStreamHolder = new 
ObjectHolder<>(null);
         session.read(flowFile, new InputStreamCallback() {
@@ -402,7 +403,7 @@ public class TestStandardProcessSession {
 
         session.write(flowFile2, nop);
 
-        FlowFile flowFile3 = session.create();
+        final FlowFile flowFile3 = session.create();
         session.write(flowFile3, nop);
 
         session.rollback();
@@ -418,8 +419,8 @@ public class TestStandardProcessSession {
 
         flowFileQueue.put(flowFileRecord);
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
         session.remove(newFlowFile);
         session.commit();
 
@@ -435,8 +436,8 @@ public class TestStandardProcessSession {
 
         flowFileQueue.put(flowFileRecord);
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
         session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
 
@@ -452,9 +453,9 @@ public class TestStandardProcessSession {
 
         flowFileQueue.put(flowFileRecord);
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
-        FlowFile secondNewFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
+        final FlowFile secondNewFlowFile = session.create(orig);
         session.remove(newFlowFile);
         session.transfer(secondNewFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
@@ -547,8 +548,8 @@ public class TestStandardProcessSession {
         // we have to increment the ID generator because we are creating a 
FlowFile without the FlowFile Repository's knowledge
         flowFileRepo.idGenerator.getAndIncrement();
 
-        FlowFile orig = session.get();
-        FlowFile newFlowFile = session.create(orig);
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
         session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
         session.getProvenanceReporter().fork(newFlowFile, 
Collections.singleton(orig));
         session.remove(orig);
@@ -566,7 +567,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void 
testProcessExceptionThrownIfCallbackThrowsInOutputStreamCallback() {
-        FlowFile ff1 = session.create();
+        final FlowFile ff1 = session.create();
 
         final RuntimeException runtime = new RuntimeException();
         try {
@@ -610,7 +611,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void testProcessExceptionThrownIfCallbackThrowsInStreamCallback() {
-        FlowFile ff1 = session.create();
+        final FlowFile ff1 = session.create();
 
         final RuntimeException runtime = new RuntimeException();
         try {
@@ -689,7 +690,7 @@ public class TestStandardProcessSession {
 
         // attempt to read the data.
         try {
-            FlowFile ff1 = session.get();
+            final FlowFile ff1 = session.get();
 
             session.read(ff1, new InputStreamCallback() {
                 @Override
@@ -738,7 +739,7 @@ public class TestStandardProcessSession {
 
         // attempt to read the data.
         try {
-            FlowFile ff1 = session.get();
+            final FlowFile ff1 = session.get();
 
             session.write(ff1, new StreamCallback() {
                 @Override
@@ -830,7 +831,7 @@ public class TestStandardProcessSession {
         // attempt to read the data.
         try {
             session.get();
-            FlowFile ff2 = session.get();
+            final FlowFile ff2 = session.get();
             session.write(ff2, new StreamCallback() {
                 @Override
                 public void process(InputStream in, OutputStream out) throws 
IOException {
@@ -918,7 +919,7 @@ public class TestStandardProcessSession {
         // attempt to read the data.
         try {
             session.get();
-            FlowFile ff2 = session.get();
+            final FlowFile ff2 = session.get();
             session.read(ff2, new InputStreamCallback() {
                 @Override
                 public void process(InputStream in) throws IOException {
@@ -931,7 +932,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void 
testProcessExceptionThrownIfCallbackThrowsInInputStreamCallback() {
-        FlowFile ff1 = session.create();
+        final FlowFile ff1 = session.create();
 
         final RuntimeException runtime = new RuntimeException();
         try {
@@ -975,7 +976,7 @@ public class TestStandardProcessSession {
 
     @Test
     public void testCreateEmitted() throws IOException {
-        FlowFile newFlowFile = session.create();
+        final FlowFile newFlowFile = session.create();
         session.transfer(newFlowFile, new 
Relationship.Builder().name("A").build());
         session.commit();
 
@@ -1114,7 +1115,7 @@ public class TestStandardProcessSession {
         private final AtomicLong claimsRemoved = new AtomicLong(0L);
         private ContentClaimManager claimManager;
 
-        private ConcurrentMap<ContentClaim, AtomicInteger> claimantCounts = 
new ConcurrentHashMap<>();
+        private final ConcurrentMap<ContentClaim, AtomicInteger> 
claimantCounts = new ConcurrentHashMap<>();
 
         @Override
         public void shutdown() {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
index a32f321..513ec10 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestVolatileContentRepository.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.controller.repository;
 
 import org.apache.nifi.controller.repository.VolatileContentRepository;
+
 import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayOutputStream;
@@ -31,8 +32,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ContentClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.util.NiFiProperties;
-
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -45,7 +46,7 @@ public class TestVolatileContentRepository {
 
     @Before
     public void setup() {
-        claimManager = new StandardContentClaimManager();
+        claimManager = new 
StandardContentClaimManager(Mockito.mock(EventReporter.class));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 054ef5e..c969bd4 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository;
 import org.apache.nifi.controller.repository.WriteAheadFlowFileRepository;
 import org.apache.nifi.controller.repository.StandardRepositoryRecord;
 import org.apache.nifi.controller.repository.StandardFlowFileRecord;
+
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -35,8 +36,8 @@ import java.util.List;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.StandardContentClaimManager;
+import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.util.file.FileUtils;
-
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -53,7 +54,7 @@ public class TestWriteAheadFlowFileRepository {
         }
 
         final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository();
-        repo.initialize(new StandardContentClaimManager());
+        repo.initialize(new 
StandardContentClaimManager(Mockito.mock(EventReporter.class)));
 
         final List<Connection> connectionList = new ArrayList<>();
         final QueueProvider queueProvider = new QueueProvider() {
@@ -119,7 +120,7 @@ public class TestWriteAheadFlowFileRepository {
 
         // restore
         final WriteAheadFlowFileRepository repo2 = new 
WriteAheadFlowFileRepository();
-        repo2.initialize(new StandardContentClaimManager());
+        repo2.initialize(new 
StandardContentClaimManager(Mockito.mock(EventReporter.class)));
         repo2.loadFlowFiles(queueProvider, 0L);
 
         assertEquals(1, flowFileCollection.size());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb5128be/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
index 4043076..f560c31 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/nifi.properties
@@ -42,6 +42,7 @@ 
nifi.flowfile.repository.directory=${nifi.flowfile.repository.directory}
 nifi.flowfile.repository.partitions=${nifi.flowfile.repository.partitions}
 
nifi.flowfile.repository.checkpoint.interval=${nifi.flowfile.repository.checkpoint.interval}
 nifi.flowfile.repository.always.sync=${nifi.flowfile.repository.always.sync}
+nifi.flowfile.repository.updates.between.sync=${nifi.flowfile.repository.updates.between.sync}
 
 nifi.swap.manager.implementation=${nifi.swap.manager.implementation}
 nifi.queue.swap.threshold=${nifi.queue.swap.threshold}

Reply via email to