http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 54987b9..16a6534 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -16,6 +16,32 @@
  */
 package org.apache.nifi.controller.repository;
 
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
@@ -23,6 +49,7 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
+import org.apache.nifi.controller.repository.claim.ContentClaimWriteCache;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.io.DisableOnCloseInputStream;
 import org.apache.nifi.controller.repository.io.DisableOnCloseOutputStream;
@@ -53,32 +80,6 @@ import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
 /**
  * <p>
  * Provides a ProcessSession that ensures all accesses, changes and transfers
@@ -143,6 +144,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     private final Map<FlowFile, ProvenanceEventBuilder> forkEventBuilders = 
new HashMap<>();
 
     private Checkpoint checkpoint = new Checkpoint();
+    private final ContentClaimWriteCache claimCache;
 
     public StandardProcessSession(final ProcessContext context) {
         this.context = context;
@@ -180,7 +182,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             context.getProvenanceRepository(), this);
         this.sessionId = idGenerator.getAndIncrement();
         this.connectableDescription = description;
-
+        this.claimCache = new 
ContentClaimWriteCache(context.getContentRepository());
         LOG.trace("Session {} created for {}", this, connectableDescription);
         processingStartTime = System.nanoTime();
     }
@@ -312,6 +314,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             final long commitStartNanos = System.nanoTime();
 
             resetReadClaim();
+            try {
+                claimCache.flush();
+            } finally {
+                claimCache.reset();
+            }
 
             final long updateProvenanceStart = System.nanoTime();
             updateProvenanceRepo(checkpoint);
@@ -375,7 +382,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             updateEventRepository(checkpoint);
 
             final long updateEventRepositoryFinishNanos = System.nanoTime();
-            final long updateEventRepositoryNanos = 
updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
+            final long updateEventRepositoryNanos = 
updateEventRepositoryFinishNanos - flowFileRepoUpdateFinishNanos;
 
             // transfer the flowfiles to the connections' queues.
             final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = 
new HashMap<>();
@@ -454,7 +461,12 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             } catch (final Exception e1) {
                 e.addSuppressed(e1);
             }
-            throw e;
+
+            if (e instanceof RuntimeException) {
+                throw (RuntimeException) e;
+            } else {
+                throw new ProcessException(e);
+            }
         }
     }
 
@@ -904,6 +916,12 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
         }
 
+        try {
+            claimCache.reset();
+        } catch (IOException e1) {
+            LOG.warn("{} Attempted to close Output Stream for {} due to 
session rollback but close failed", this, this.connectableDescription, e1);
+        }
+
         final Set<StandardRepositoryRecord> recordsToHandle = new HashSet<>();
         recordsToHandle.addAll(records.values());
         if (rollbackCheckpoint) {
@@ -2033,6 +2051,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                     }
                 }
 
+                claimCache.flush(claim);
                 final InputStream rawInStream = 
context.getContentRepository().read(claim);
 
                 if (currentReadClaimStream != null) {
@@ -2047,6 +2066,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 // reuse the same InputStream for the next FlowFile
                 return new DisableOnCloseInputStream(currentReadClaimStream);
             } else {
+                claimCache.flush(claim);
                 final InputStream rawInStream = 
context.getContentRepository().read(claim);
                 try {
                     StreamUtils.skip(rawInStream, offset);
@@ -2077,6 +2097,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         try {
             ensureNotAppending(record.getCurrentClaim());
+            claimCache.flush(record.getCurrentClaim());
         } catch (final IOException e) {
             throw new FlowFileAccessException("Failed to access ContentClaim 
for " + source.toString(), e);
         }
@@ -2241,6 +2262,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
             try {
                 ensureNotAppending(record.getCurrentClaim());
+                claimCache.flush(record.getCurrentClaim());
             } catch (final IOException e) {
                 throw new FlowFileAccessException("Unable to read from source 
" + source + " due to " + e.toString(), e);
             }
@@ -2334,11 +2356,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         long writtenToFlowFile = 0L;
         ContentClaim newClaim = null;
         try {
-            newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            newClaim = claimCache.getContentClaim();
             claimLog.debug("Creating ContentClaim {} for 'write' for {}", 
newClaim, source);
 
             ensureNotAppending(newClaim);
-            try (final OutputStream stream = 
context.getContentRepository().write(newClaim);
+            try (final OutputStream stream = claimCache.write(newClaim);
                 final OutputStream disableOnClose = new 
DisableOnCloseOutputStream(stream);
                 final ByteCountingOutputStream countingOut = new 
ByteCountingOutputStream(disableOnClose)) {
                 try {
@@ -2373,7 +2395,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
             .fromFlowFile(record.getCurrent())
             .contentClaim(newClaim)
-            .contentClaimOffset(0)
+            .contentClaimOffset(Math.max(0, newClaim.getLength() - 
writtenToFlowFile))
             .size(writtenToFlowFile)
             .build();
 
@@ -2396,6 +2418,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         ContentClaim newClaim = null;
         try {
             if (outStream == null) {
+                claimCache.flush(oldClaim);
+
                 try (final InputStream oldClaimIn = 
context.getContentRepository().read(oldClaim)) {
                     newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
                     claimLog.debug("Creating ContentClaim {} for 'append' for 
{}", newClaim, source);
@@ -2568,16 +2592,20 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         long writtenToFlowFile = 0L;
         ContentClaim newClaim = null;
         try {
-            newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
+            newClaim = claimCache.getContentClaim();
             claimLog.debug("Creating ContentClaim {} for 'write' for {}", 
newClaim, source);
 
             ensureNotAppending(newClaim);
 
+            if (currClaim != null) {
+                claimCache.flush(currClaim.getResourceClaim());
+            }
+
             try (final InputStream is = getInputStream(source, currClaim, 
record.getCurrentClaimOffset(), true);
                 final InputStream limitedIn = new LimitedInputStream(is, 
source.getSize());
                 final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
                 final ByteCountingInputStream countingIn = new 
ByteCountingInputStream(disableOnCloseIn, bytesRead);
-                final OutputStream os = 
context.getContentRepository().write(newClaim);
+                final OutputStream os = claimCache.write(newClaim);
                 final OutputStream disableOnCloseOut = new 
DisableOnCloseOutputStream(os);
                 final ByteCountingOutputStream countingOut = new 
ByteCountingOutputStream(disableOnCloseOut)) {
 
@@ -2626,7 +2654,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
             .fromFlowFile(record.getCurrent())
             .contentClaim(newClaim)
-            .contentClaimOffset(0L)
+            .contentClaimOffset(Math.max(0L, newClaim.getLength() - 
writtenToFlowFile))
             .size(writtenToFlowFile)
             .build();
 
@@ -2668,8 +2696,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         removeTemporaryClaim(record);
 
-        final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
-            
.contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+            .fromFlowFile(record.getCurrent())
+            .contentClaim(newClaim)
+            .contentClaimOffset(claimOffset)
+            .size(newSize)
             .addAttribute(CoreAttributes.FILENAME.key(), 
source.toFile().getName())
             .build();
         record.setWorking(newFile, CoreAttributes.FILENAME.key(), 
source.toFile().getName());
@@ -2708,7 +2739,12 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
 
         removeTemporaryClaim(record);
-        final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent()).contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize).build();
+        final FlowFileRecord newFile = new StandardFlowFileRecord.Builder()
+            .fromFlowFile(record.getCurrent())
+            .contentClaim(newClaim)
+            .contentClaimOffset(claimOffset)
+            .size(newSize)
+            .build();
         record.setWorking(newFile);
         return newFile;
     }
@@ -2720,6 +2756,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         try {
             ensureNotAppending(record.getCurrentClaim());
 
+            claimCache.flush(record.getCurrentClaim());
             final long copyCount = 
context.getContentRepository().exportTo(record.getCurrentClaim(), destination, 
append, record.getCurrentClaimOffset(), source.getSize());
             bytesRead += copyCount;
             bytesWritten += copyCount;
@@ -2741,6 +2778,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         try {
             ensureNotAppending(record.getCurrentClaim());
+            claimCache.flush(record.getCurrentClaim());
         } catch (final IOException e) {
             throw new FlowFileAccessException("Failed to access ContentClaim 
for " + source.toString(), e);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index b5807ca..9f8ff98 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -27,6 +28,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -74,6 +77,7 @@ import org.wali.WriteAheadRepository;
  * </p>
  */
 public class WriteAheadFlowFileRepository implements FlowFileRepository, 
SyncListener {
+    private static final String FLOWFILE_REPOSITORY_DIRECTORY_PREFIX = 
"nifi.flowfile.repository.directory";
 
     private final AtomicLong flowFileSequenceGenerator = new AtomicLong(0L);
     private final boolean alwaysSync;
@@ -82,7 +86,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     private volatile ScheduledFuture<?> checkpointFuture;
 
     private final long checkpointDelayMillis;
-    private final Path flowFileRepositoryPath;
+    private final SortedSet<Path> flowFileRepositoryPaths = new TreeSet<>();
     private final int numPartitions;
     private final ScheduledExecutorService checkpointExecutor;
 
@@ -120,7 +124,6 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     public WriteAheadFlowFileRepository() {
         alwaysSync = false;
         checkpointDelayMillis = 0l;
-        flowFileRepositoryPath = null;
         numPartitions = 0;
         checkpointExecutor = null;
     }
@@ -129,7 +132,13 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         alwaysSync = 
Boolean.parseBoolean(nifiProperties.getProperty(NiFiProperties.FLOWFILE_REPOSITORY_ALWAYS_SYNC,
 "false"));
 
         // determine the database file path and ensure it exists
-        flowFileRepositoryPath = nifiProperties.getFlowFileRepositoryPath();
+        for (final String propertyName : nifiProperties.getPropertyKeys()) {
+            if (propertyName.startsWith(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX)) 
{
+                final String directoryName = 
nifiProperties.getProperty(propertyName);
+                flowFileRepositoryPaths.add(Paths.get(directoryName));
+            }
+        }
+
         numPartitions = nifiProperties.getFlowFileRepositoryPartitions();
         checkpointDelayMillis = 
FormatUtils.getTimeDuration(nifiProperties.getFlowFileRepositoryCheckpointInterval(),
 TimeUnit.MILLISECONDS);
 
@@ -140,14 +149,17 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     public void initialize(final ResourceClaimManager claimManager) throws 
IOException {
         this.claimManager = claimManager;
 
-        Files.createDirectories(flowFileRepositoryPath);
+        for (final Path path : flowFileRepositoryPaths) {
+            Files.createDirectories(path);
+        }
 
         // TODO: Should ensure that only 1 instance running and pointing at a 
particular path
         // TODO: Allow for backup path that can be used if disk out of space?? 
Would allow a snapshot to be stored on
         // backup and then the data deleted from the normal location; then can 
move backup to normal location and
         // delete backup. On restore, if no files exist in partition's 
directory, would have to check backup directory
         serdeFactory = new RepositoryRecordSerdeFactory(claimManager);
-        wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPath, 
numPartitions, serdeFactory, this);
+        wal = new MinimalLockingWriteAheadLog<>(flowFileRepositoryPaths, 
numPartitions, serdeFactory, this);
+        logger.info("Initialized FlowFile Repository using {} partitions", 
numPartitions);
     }
 
     @Override
@@ -167,12 +179,22 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
     @Override
     public long getStorageCapacity() throws IOException {
-        return Files.getFileStore(flowFileRepositoryPath).getTotalSpace();
+        long capacity = 0L;
+        for (final Path path : flowFileRepositoryPaths) {
+            capacity += Files.getFileStore(path).getTotalSpace();
+        }
+
+        return capacity;
     }
 
     @Override
     public long getUsableStorageSpace() throws IOException {
-        return Files.getFileStore(flowFileRepositoryPath).getUsableSpace();
+        long usableSpace = 0L;
+        for (final Path path : flowFileRepositoryPaths) {
+            usableSpace += Files.getFileStore(path).getUsableSpace();
+        }
+
+        return usableSpace;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
new file mode 100644
index 0000000..6b608ac
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.claim;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+
+public class ContentClaimWriteCache {
+    private final ContentRepository contentRepo;
+    private final Map<ResourceClaim, OutputStream> streamMap = new HashMap<>();
+    private final Queue<ContentClaim> queue = new LinkedList<>();
+    private final int bufferSize;
+
+    public ContentClaimWriteCache(final ContentRepository contentRepo) {
+        this(contentRepo, 8192);
+    }
+
+    public ContentClaimWriteCache(final ContentRepository contentRepo, final 
int bufferSize) {
+        this.contentRepo = contentRepo;
+        this.bufferSize = bufferSize;
+    }
+
+    public void reset() throws IOException {
+        try {
+            forEachStream(OutputStream::close);
+        } finally {
+            streamMap.clear();
+            queue.clear();
+        }
+    }
+
+    public ContentClaim getContentClaim() throws IOException {
+        final ContentClaim contentClaim = queue.poll();
+        if (contentClaim != null) {
+            contentRepo.incrementClaimaintCount(contentClaim);
+            return contentClaim;
+        }
+
+        final ContentClaim claim = contentRepo.create(false);
+        registerStream(claim);
+        return claim;
+    }
+
+    private OutputStream registerStream(final ContentClaim contentClaim) 
throws IOException {
+        final OutputStream out = contentRepo.write(contentClaim);
+        final OutputStream buffered = new BufferedOutputStream(out, 
bufferSize);
+        streamMap.put(contentClaim.getResourceClaim(), buffered);
+        return buffered;
+    }
+
+    public OutputStream write(final ContentClaim claim) throws IOException {
+        OutputStream out = streamMap.get(claim.getResourceClaim());
+        if (out == null) {
+            out = registerStream(claim);
+        }
+
+        if (!(claim instanceof StandardContentClaim)) {
+            // we know that we will only create Content Claims that are of 
type StandardContentClaim, so if we get anything
+            // else, just throw an Exception because it is not valid for this 
Repository
+            throw new IllegalArgumentException("Cannot write to " + claim + " 
because that Content Claim does belong to this Claim Cache");
+        }
+
+        final StandardContentClaim scc = (StandardContentClaim) claim;
+        final long initialLength = Math.max(0L, scc.getLength());
+
+        final OutputStream bcos = out;
+        return new OutputStream() {
+            private long bytesWritten = 0L;
+
+            @Override
+            public void write(final int b) throws IOException {
+                bcos.write(b);
+                bytesWritten++;
+                scc.setLength(initialLength + bytesWritten);
+            }
+
+            @Override
+            public void write(byte[] b, int off, int len) throws IOException {
+                bcos.write(b, off, len);
+                bytesWritten += len;
+                scc.setLength(initialLength + bytesWritten);
+            }
+
+            @Override
+            public void write(final byte[] b) throws IOException {
+                write(b, 0, b.length);
+            }
+
+            @Override
+            public void flush() throws IOException {
+                // do nothing - do not flush underlying stream.
+            }
+
+            @Override
+            public void close() throws IOException {
+                queue.offer(claim);
+            }
+        };
+    }
+
+    public void flush(final ContentClaim contentClaim) throws IOException {
+        if (contentClaim == null) {
+            return;
+        }
+
+        flush(contentClaim.getResourceClaim());
+    }
+
+    public void flush(final ResourceClaim claim) throws IOException {
+        final OutputStream out = streamMap.get(claim);
+        if (out != null) {
+            out.flush();
+        }
+    }
+
+    public void flush() throws IOException {
+        forEachStream(OutputStream::flush);
+    }
+
+    private void forEachStream(final StreamProcessor proc) throws IOException {
+        IOException exception = null;
+
+        for (final OutputStream out : streamMap.values()) {
+            try {
+                proc.process(out);
+            } catch (final IOException ioe) {
+                if (exception == null) {
+                    exception = ioe;
+                } else {
+                    ioe.addSuppressed(exception);
+                    exception = ioe;
+                }
+            }
+        }
+
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    private interface StreamProcessor {
+        void process(final OutputStream out) throws IOException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
index 76c208d..b218ee6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/ContentClaimFieldMap.java
@@ -65,6 +65,39 @@ public class ContentClaimFieldMap implements Record {
     }
 
     @Override
+    public int hashCode() {
+        return (int) (31 + contentClaimOffset + 21 * 
resourceClaimFieldMap.hashCode());
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        ContentClaimFieldMap other = (ContentClaimFieldMap) obj;
+        if (contentClaimOffset != other.contentClaimOffset) {
+            return false;
+        }
+
+        if (resourceClaimFieldMap == null) {
+            if (other.resourceClaimFieldMap != null) {
+                return false;
+            }
+        } else if (!resourceClaimFieldMap.equals(other.resourceClaimFieldMap)) 
{
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
     public String toString() {
         return "ContentClaimFieldMap[" + contentClaim + "]";
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
index c11353b..93fa4e4 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordUpdate.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.controller.repository.schema;
 
+import org.apache.nifi.controller.repository.RepositoryRecordType;
 import org.apache.nifi.repository.schema.NamedValue;
 import org.apache.nifi.repository.schema.Record;
 import org.apache.nifi.repository.schema.RecordSchema;
@@ -39,7 +40,10 @@ public class RepositoryRecordUpdate implements Record {
     @Override
     public Object getFieldValue(final String fieldName) {
         if 
(RepositoryRecordSchema.REPOSITORY_RECORD_UPDATE_V2.equals(fieldName)) {
-            final String actionType = (String) 
fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE);
+            String actionType = (String) 
fieldMap.getFieldValue(RepositoryRecordSchema.ACTION_TYPE);
+            if (RepositoryRecordType.CONTENTMISSING.name().equals(actionType)) 
{
+                actionType = RepositoryRecordType.DELETE.name();
+            }
             final UpdateType updateType = UpdateType.valueOf(actionType);
 
             final String actionName;

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
index 0c4972b..22684a6 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java
@@ -83,7 +83,7 @@ public class EventDrivenSchedulingAgent extends 
AbstractSchedulingAgent {
 
         for (int i = 0; i < maxThreadCount; i++) {
             final Runnable eventDrivenTask = new EventDrivenTask(workerQueue);
-            flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000, 
TimeUnit.NANOSECONDS);
+            flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, 
TimeUnit.NANOSECONDS);
         }
     }
 
@@ -132,7 +132,7 @@ public class EventDrivenSchedulingAgent extends 
AbstractSchedulingAgent {
             final int tasksToAdd = maxThreadCount - oldMax;
             for (int i = 0; i < tasksToAdd; i++) {
                 final Runnable eventDrivenTask = new 
EventDrivenTask(workerQueue);
-                flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 30000, 
TimeUnit.NANOSECONDS);
+                flowEngine.scheduleWithFixedDelay(eventDrivenTask, 0L, 1L, 
TimeUnit.NANOSECONDS);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
index 05c32e1..59a4b1b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
@@ -87,6 +87,13 @@ public enum ProcessorStatusDescriptor {
         Formatter.DURATION,
         s -> TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), 
TimeUnit.NANOSECONDS))),
 
+    TASK_NANOS(new StandardMetricDescriptor<ProcessorStatus>(
+        "taskNanos",
+        "Total Task Time (nanos)",
+        "The total number of thread-nanoseconds that the Processor has used to 
complete its tasks in the past 5 minutes",
+        Formatter.COUNT,
+        ProcessorStatus::getProcessingNanos), false),
+
     FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>(
         "flowFilesRemoved",
         "FlowFiles Removed (5 mins)",
@@ -122,35 +129,50 @@ public enum ProcessorStatusDescriptor {
         }
     )),
 
-    AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
-        "averageTaskMillis",
-        "Average Task Duration",
-        "The average duration it took this Processor to complete a task, as 
averaged over the past 5 minutes",
-        Formatter.DURATION,
-        s -> s.getInvocations() == 0 ? 0 : 
TimeUnit.MILLISECONDS.convert(s.getProcessingNanos(), TimeUnit.NANOSECONDS) / 
s.getInvocations(),
+    AVERAGE_TASK_NANOS(new StandardMetricDescriptor<ProcessorStatus>(
+        "averageTaskNanos",
+        "Average Task Duration (nanoseconds)",
+        "The average number of nanoseconds it took this Processor to complete 
a task, over the past 5 minutes",
+        Formatter.COUNT,
+        s -> s.getInvocations() == 0 ? 0 : s.getProcessingNanos() / 
s.getInvocations(),
         new ValueReducer<StatusSnapshot, Long>() {
             @Override
             public Long reduce(final List<StatusSnapshot> values) {
-                long procMillis = 0L;
+                long procNanos = 0L;
                 int invocations = 0;
 
                 for (final StatusSnapshot snapshot : values) {
-                    procMillis += 
snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue();
-                    invocations += 
snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue();
+                    final Long taskNanos = 
snapshot.getStatusMetrics().get(TASK_NANOS.getDescriptor());
+                    if (taskNanos != null) {
+                        procNanos += taskNanos.longValue();
+                    }
+
+                    final Long taskInvocations = 
snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor());
+                    if (taskInvocations != null) {
+                        invocations += taskInvocations.intValue();
+                    }
                 }
 
                 if (invocations == 0) {
                     return 0L;
                 }
 
-                return procMillis / invocations;
+                return procNanos / invocations;
             }
         }));
 
-    private MetricDescriptor<ProcessorStatus> descriptor;
+
+
+    private final MetricDescriptor<ProcessorStatus> descriptor;
+    private final boolean visible;
 
     private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> 
descriptor) {
+        this(descriptor, true);
+    }
+
+    private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> 
descriptor, final boolean visible) {
         this.descriptor = descriptor;
+        this.visible = visible;
     }
 
     public String getField() {
@@ -160,4 +182,8 @@ public enum ProcessorStatusDescriptor {
     public MetricDescriptor<ProcessorStatus> getDescriptor() {
         return descriptor;
     }
+
+    public boolean isVisible() {
+        return visible;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index 4c620d1..3203972 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -93,7 +93,9 @@ public class VolatileComponentStatusRepository implements 
ComponentStatusReposit
                 snapshot.setTimestamp(capture.getCaptureDate());
 
                 for (final ProcessorStatusDescriptor descriptor : 
ProcessorStatusDescriptor.values()) {
-                    snapshot.addStatusMetric(descriptor.getDescriptor(), 
descriptor.getDescriptor().getValueFunction().getValue(status));
+                    if (descriptor.isVisible()) {
+                        snapshot.addStatusMetric(descriptor.getDescriptor(), 
descriptor.getDescriptor().getValueFunction().getValue(status));
+                    }
                 }
 
                 history.addStatusSnapshot(snapshot);

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index 3bc2356..01f3c8c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -82,6 +82,13 @@ public class ContinuallyRunProcessorTask implements 
Callable<Boolean> {
         return procNode.isTriggerWhenEmpty() || 
!procNode.hasIncomingConnection() || 
!Connectables.hasNonLoopConnection(procNode) || 
Connectables.flowFilesQueued(procNode);
     }
 
+    private boolean isBackPressureEngaged() {
+        return procNode.getIncomingConnections().stream()
+            .filter(con -> con.getSource() == procNode)
+            .map(con -> con.getFlowFileQueue())
+            .anyMatch(queue -> queue.isFull());
+    }
+
     @Override
     public Boolean call() {
         // make sure processor is not yielded
@@ -127,6 +134,7 @@ public class ContinuallyRunProcessorTask implements 
Callable<Boolean> {
         scheduleState.incrementActiveThreadCount();
 
         final long startNanos = System.nanoTime();
+        final long finishIfBackpressureEngaged = startNanos + (batchNanos / 
25L);
         final long finishNanos = startNanos + batchNanos;
         int invocationCount = 0;
         try {
@@ -140,10 +148,16 @@ public class ContinuallyRunProcessorTask implements 
Callable<Boolean> {
                         return false;
                     }
 
-                    if (System.nanoTime() > finishNanos) {
+                    final long nanoTime = System.nanoTime();
+                    if (nanoTime > finishNanos) {
+                        return false;
+                    }
+
+                    if (nanoTime > finishIfBackpressureEngaged && 
isBackPressureEngaged()) {
                         return false;
                     }
 
+
                     if (!isWorkToDo(procNode)) {
                         break;
                     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 7edb0e7..845ca56 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -34,10 +34,15 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardOpenOption;
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.controller.repository.claim.ContentClaim;
@@ -56,8 +61,6 @@ import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.Logger;
 import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.read.ListAppender;
-import java.util.HashMap;
-import java.util.Map;
 
 public class TestFileSystemRepository {
 
@@ -89,6 +92,38 @@ public class TestFileSystemRepository {
     }
 
     @Test
+    public void testWritePerformance() throws IOException {
+        final long bytesToWrite = 1_000_000_000L;
+        final int contentSize = 100;
+
+        final int iterations = (int) (bytesToWrite / contentSize);
+        final byte[] content = new byte[contentSize];
+        final Random random = new Random();
+        random.nextBytes(content);
+
+        //        final ContentClaimWriteCache cache = new 
ContentClaimWriteCache(repository);
+
+        final long start = System.nanoTime();
+        for (int i = 0; i < iterations; i++) {
+            final ContentClaim claim = repository.create(false);
+            try (final OutputStream out = repository.write(claim)) {
+                out.write(content);
+            }
+            //            final ContentClaim claim = cache.getContentClaim();
+            //            try (final OutputStream out = cache.write(claim)) {
+            //                out.write(content);
+            //            }
+        }
+        final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
+
+        final long mb = bytesToWrite / (1024 * 1024);
+        final long seconds = millis / 1000L;
+        final double mbps = (double) mb / (double) seconds;
+        System.out.println("Took " + millis + " millis to write " + 
contentSize + " bytes " + iterations + " times (total of "
+            + NumberFormat.getNumberInstance(Locale.US).format(bytesToWrite) + 
" bytes) for a write rate of " + mbps + " MB/s");
+    }
+
+    @Test
     public void testMinimalArchiveCleanupIntervalHonoredAndLogged() throws 
Exception {
         // We are going to construct our own repository using different 
properties, so
         // we need to shutdown the existing one.

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 9070d0c..e0c9ffe 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -86,6 +86,7 @@ import org.apache.nifi.util.NiFiProperties;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -987,6 +988,7 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    @Ignore
     public void testManyFilesOpened() throws IOException {
 
         StandardProcessSession[] standardProcessSessions = new 
StandardProcessSession[100000];
@@ -1672,9 +1674,9 @@ public class TestStandardProcessSession {
 
         @Override
         public int incrementClaimaintCount(ContentClaim claim) {
-            final AtomicInteger count = claimantCounts.get(claim);
+            AtomicInteger count = claimantCounts.get(claim);
             if (count == null) {
-                throw new IllegalArgumentException("Unknown Claim: " + claim);
+                count = new AtomicInteger(0);
             }
             return count.incrementAndGet();
         }
@@ -1835,6 +1837,11 @@ public class TestStandardProcessSession {
                     fos.write(b);
                     ((StandardContentClaim) claim).setLength(claim.getLength() 
+ b.length);
                 }
+
+                @Override
+                public void close() throws IOException {
+                    super.close();
+                }
             };
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 6525822..329b268 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.when;
@@ -33,12 +34,17 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.Connection;
 import org.apache.nifi.controller.StandardFlowFileQueue;
+import org.apache.nifi.controller.queue.DropFlowFileStatus;
 import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.ListFlowFileStatus;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
@@ -47,15 +53,22 @@ import 
org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.swap.StandardSwapContents;
 import org.apache.nifi.controller.swap.StandardSwapSummary;
+import org.apache.nifi.flowfile.FlowFilePrioritizer;
+import org.apache.nifi.processor.FlowFileFilter;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.util.file.FileUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.wali.MinimalLockingWriteAheadLog;
+import org.wali.WriteAheadRepository;
 
 public class TestWriteAheadFlowFileRepository {
 
@@ -74,6 +87,254 @@ public class TestWriteAheadFlowFileRepository {
         }
     }
 
+
+    @Test
+    @Ignore("Intended only for local performance testing before/after making 
changes")
+    public void testUpdatePerformance() throws IOException, 
InterruptedException {
+        final FlowFileQueue queue = new FlowFileQueue() {
+
+            @Override
+            public String getIdentifier() {
+                return "4444";
+            }
+
+            @Override
+            public List<FlowFilePrioritizer> getPriorities() {
+                return null;
+            }
+
+            @Override
+            public SwapSummary recoverSwappedFlowFiles() {
+                return null;
+            }
+
+            @Override
+            public void purgeSwapFiles() {
+            }
+
+            @Override
+            public void setPriorities(List<FlowFilePrioritizer> newPriorities) 
{
+            }
+
+            @Override
+            public void setBackPressureObjectThreshold(long maxQueueSize) {
+            }
+
+            @Override
+            public long getBackPressureObjectThreshold() {
+                return 0;
+            }
+
+            @Override
+            public void setBackPressureDataSizeThreshold(String maxDataSize) {
+            }
+
+            @Override
+            public String getBackPressureDataSizeThreshold() {
+                return null;
+            }
+
+            @Override
+            public QueueSize size() {
+                return null;
+            }
+
+            @Override
+            public boolean isEmpty() {
+                return false;
+            }
+
+            @Override
+            public boolean isActiveQueueEmpty() {
+                return false;
+            }
+
+            @Override
+            public QueueSize getUnacknowledgedQueueSize() {
+                return null;
+            }
+
+            @Override
+            public void acknowledge(FlowFileRecord flowFile) {
+            }
+
+            @Override
+            public void acknowledge(Collection<FlowFileRecord> flowFiles) {
+            }
+
+            @Override
+            public boolean isFull() {
+                return false;
+            }
+
+            @Override
+            public void put(FlowFileRecord file) {
+            }
+
+            @Override
+            public void putAll(Collection<FlowFileRecord> files) {
+            }
+
+            @Override
+            public FlowFileRecord poll(Set<FlowFileRecord> expiredRecords) {
+                return null;
+            }
+
+            @Override
+            public List<FlowFileRecord> poll(int maxResults, 
Set<FlowFileRecord> expiredRecords) {
+                return null;
+            }
+
+            @Override
+            public long drainQueue(Queue<FlowFileRecord> sourceQueue, 
List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> 
expiredRecords) {
+                return 0;
+            }
+
+            @Override
+            public List<FlowFileRecord> poll(FlowFileFilter filter, 
Set<FlowFileRecord> expiredRecords) {
+                return null;
+            }
+
+            @Override
+            public String getFlowFileExpiration() {
+                return null;
+            }
+
+            @Override
+            public int getFlowFileExpiration(TimeUnit timeUnit) {
+                return 0;
+            }
+
+            @Override
+            public void setFlowFileExpiration(String flowExpirationPeriod) {
+            }
+
+            @Override
+            public DropFlowFileStatus dropFlowFiles(String requestIdentifier, 
String requestor) {
+                return null;
+            }
+
+            @Override
+            public DropFlowFileStatus getDropFlowFileStatus(String 
requestIdentifier) {
+                return null;
+            }
+
+            @Override
+            public DropFlowFileStatus cancelDropFlowFileRequest(String 
requestIdentifier) {
+                return null;
+            }
+
+            @Override
+            public ListFlowFileStatus listFlowFiles(String requestIdentifier, 
int maxResults) {
+                return null;
+            }
+
+            @Override
+            public ListFlowFileStatus getListFlowFileStatus(String 
requestIdentifier) {
+                return null;
+            }
+
+            @Override
+            public ListFlowFileStatus cancelListFlowFileRequest(String 
requestIdentifier) {
+                return null;
+            }
+
+            @Override
+            public FlowFileRecord getFlowFile(String flowFileUuid) throws 
IOException {
+                return null;
+            }
+
+            @Override
+            public void verifyCanList() throws IllegalStateException {
+            }
+        };
+
+
+        final int numPartitions = 16;
+        final int numThreads = 8;
+        final int totalUpdates = 160_000_000;
+        final int batchSize = 10;
+
+        final Path path = Paths.get("target/minimal-locking-repo");
+        deleteRecursively(path.toFile());
+        assertTrue(path.toFile().mkdirs());
+
+        final ResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final RepositoryRecordSerdeFactory serdeFactory = new 
RepositoryRecordSerdeFactory(claimManager);
+        final WriteAheadRepository<RepositoryRecord> repo = new 
MinimalLockingWriteAheadLog<>(path, numPartitions, serdeFactory, null);
+        final Collection<RepositoryRecord> initialRecs = repo.recoverRecords();
+        assertTrue(initialRecs.isEmpty());
+
+        final int updateCountPerThread = totalUpdates / numThreads;
+
+        final Thread[] threads = new Thread[numThreads];
+        for (int j = 0; j < 2; j++) {
+            for (int i = 0; i < numThreads; i++) {
+                final Thread t = new Thread(new Runnable() {
+                    @Override
+                    public void run() {
+                        final List<RepositoryRecord> records = new 
ArrayList<>();
+                        final int numBatches = updateCountPerThread / 
batchSize;
+                        final MockFlowFile baseFlowFile = new MockFlowFile(0L);
+
+                        for (int i = 0; i < numBatches; i++) {
+                            records.clear();
+                            for (int k = 0; k < batchSize; k++) {
+                                final FlowFileRecord flowFile = new 
MockFlowFile(i % 100_000, baseFlowFile);
+                                final String uuid = 
flowFile.getAttribute("uuid");
+
+                                final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null, flowFile);
+                                record.setDestination(queue);
+                                final Map<String, String> updatedAttrs = 
Collections.singletonMap("uuid", uuid);
+                                record.setWorking(flowFile, updatedAttrs);
+
+                                records.add(record);
+                            }
+
+                            try {
+                                repo.update(records, false);
+                            } catch (IOException e) {
+                                e.printStackTrace();
+                                Assert.fail(e.toString());
+                            }
+                        }
+                    }
+                });
+
+                t.setDaemon(true);
+                threads[i] = t;
+            }
+
+            final long start = System.nanoTime();
+            for (final Thread t : threads) {
+                t.start();
+            }
+            for (final Thread t : threads) {
+                t.join();
+            }
+
+            final long millis = 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            if (j == 0) {
+                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numPartitions + " 
partitions and " + numThreads + " threads, *as a warmup!*");
+            } else {
+                System.out.println(millis + " ms to insert " + 
updateCountPerThread * numThreads + " updates using " + numPartitions + " 
partitions and " + numThreads + " threads");
+            }
+        }
+    }
+
+    private void deleteRecursively(final File file) {
+        final File[] children = file.listFiles();
+        if (children != null) {
+            for (final File child : children) {
+                deleteRecursively(child);
+            }
+        }
+
+        file.delete();
+    }
+
+
+
     @Test
     public void testResourceClaimsIncremented() throws IOException {
         final ResourceClaimManager claimManager = new 
StandardResourceClaimManager();

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java
new file mode 100644
index 0000000..fc08f55
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestContentClaimWriteCache.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.claim;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.nifi.controller.repository.FileSystemRepository;
+import org.apache.nifi.controller.repository.TestFileSystemRepository;
+import org.apache.nifi.controller.repository.util.DiskUtils;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.NiFiProperties;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestContentClaimWriteCache {
+
+    private FileSystemRepository repository = null;
+    private StandardResourceClaimManager claimManager = null;
+    private final File rootFile = new 
File("target/testContentClaimWriteCache");
+    private NiFiProperties nifiProperties;
+
+    @Before
+    public void setup() throws IOException {
+        System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, 
TestFileSystemRepository.class.getResource("/conf/nifi.properties").getFile());
+        nifiProperties = NiFiProperties.createBasicNiFiProperties(null, null);
+        if (rootFile.exists()) {
+            DiskUtils.deleteRecursively(rootFile);
+        }
+        repository = new FileSystemRepository(nifiProperties);
+        claimManager = new StandardResourceClaimManager();
+        repository.initialize(claimManager);
+        repository.purge();
+    }
+
+    @After
+    public void shutdown() throws IOException {
+        repository.shutdown();
+    }
+
+    @Test
+    public void testFlushWriteCorrectData() throws IOException {
+        final ContentClaimWriteCache cache = new 
ContentClaimWriteCache(repository, 4);
+
+        final ContentClaim claim1 = cache.getContentClaim();
+        assertNotNull(claim1);
+
+        final OutputStream out = cache.write(claim1);
+        assertNotNull(out);
+        out.write("hello".getBytes());
+        out.write("good-bye".getBytes());
+
+        cache.flush();
+
+        assertEquals(13L, claim1.getLength());
+        final InputStream in = repository.read(claim1);
+        final byte[] buff = new byte[(int) claim1.getLength()];
+        StreamUtils.fillBuffer(in, buff);
+        Assert.assertArrayEquals("hellogood-bye".getBytes(), buff);
+
+        final ContentClaim claim2 = cache.getContentClaim();
+        final OutputStream out2 = cache.write(claim2);
+        assertNotNull(out2);
+        out2.write("good-day".getBytes());
+        out2.write("hello".getBytes());
+
+        cache.flush();
+
+        assertEquals(13L, claim2.getLength());
+        final InputStream in2 = repository.read(claim2);
+        final byte[] buff2 = new byte[(int) claim2.getLength()];
+        StreamUtils.fillBuffer(in2, buff2);
+        Assert.assertArrayEquals("good-dayhello".getBytes(), buff2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
index 662f87e..cecfabf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/pom.xml
@@ -98,7 +98,7 @@
         <nifi.provenance.repository.rollover.time>30 
secs</nifi.provenance.repository.rollover.time>
         <nifi.provenance.repository.rollover.size>100 
MB</nifi.provenance.repository.rollover.size>
         
<nifi.provenance.repository.query.threads>2</nifi.provenance.repository.query.threads>
-        
<nifi.provenance.repository.index.threads>1</nifi.provenance.repository.index.threads>
+        
<nifi.provenance.repository.index.threads>2</nifi.provenance.repository.index.threads>
         
<nifi.provenance.repository.compress.on.rollover>true</nifi.provenance.repository.compress.on.rollover>
         <nifi.provenance.repository.indexed.fields>EventType, FlowFileUUID, 
Filename, ProcessorID, Relationship</nifi.provenance.repository.indexed.fields>
         <nifi.provenance.repository.indexed.attributes />
@@ -106,6 +106,8 @@
         
<nifi.provenance.repository.always.sync>false</nifi.provenance.repository.always.sync>
         
<nifi.provenance.repository.journal.count>16</nifi.provenance.repository.journal.count>
         
<nifi.provenance.repository.max.attribute.length>65536</nifi.provenance.repository.max.attribute.length>
+        
<nifi.provenance.repository.concurrent.merge.threads>2</nifi.provenance.repository.concurrent.merge.threads>
+        <nifi.provenance.repository.warm.cache.frequency>1 
hour</nifi.provenance.repository.warm.cache.frequency>
 
         <!-- volatile provenance repository properties -->
         
<nifi.provenance.repository.buffer.size>100000</nifi.provenance.repository.buffer.size>

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
index 2c84861..fcc481b 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/AbstractRecordWriter.java
@@ -32,18 +32,26 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractRecordWriter.class);
 
     private final File file;
+    private final String storageLocation;
     private final TocWriter tocWriter;
     private final Lock lock = new ReentrantLock();
 
     private volatile boolean dirty = false;
     private volatile boolean closed = false;
 
-    private int recordsWritten = 0;
-
     public AbstractRecordWriter(final File file, final TocWriter writer) 
throws IOException {
         logger.trace("Creating Record Writer for {}", file);
 
         this.file = file;
+        this.storageLocation = file.getName();
+        this.tocWriter = writer;
+    }
+
+    public AbstractRecordWriter(final String storageLocation, final TocWriter 
writer) throws IOException {
+        logger.trace("Creating Record Writer for {}", storageLocation);
+
+        this.file = null;
+        this.storageLocation = storageLocation;
         this.tocWriter = writer;
     }
 
@@ -51,7 +59,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     public synchronized void close() throws IOException {
         closed = true;
 
-        logger.trace("Closing Record Writer for {}", file == null ? null : 
file.getName());
+        logger.trace("Closing Record Writer for {}", getStorageLocation());
 
         lock();
         try {
@@ -94,9 +102,8 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
         }
     }
 
-    @Override
-    public int getRecordsWritten() {
-        return recordsWritten;
+    protected String getStorageLocation() {
+        return storageLocation;
     }
 
     @Override
@@ -133,6 +140,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
         this.dirty = true;
     }
 
+    @Override
     public boolean isDirty() {
         return dirty;
     }
@@ -142,7 +150,7 @@ public abstract class AbstractRecordWriter implements 
RecordWriter {
     }
 
     @Override
-    public void sync() throws IOException {
+    public synchronized void sync() throws IOException {
         try {
             if (tocWriter != null) {
                 tocWriter.sync();

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
index cae2f40..2a42501 100644
--- 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/ByteArraySchemaRecordWriter.java
@@ -18,34 +18,37 @@
 package org.apache.nifi.provenance;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.nifi.provenance.schema.EventFieldNames;
 import org.apache.nifi.provenance.schema.EventRecord;
-import org.apache.nifi.provenance.schema.EventRecordFields;
 import org.apache.nifi.provenance.schema.ProvenanceEventSchema;
 import org.apache.nifi.provenance.serialization.CompressableRecordWriter;
 import org.apache.nifi.provenance.toc.TocWriter;
 import org.apache.nifi.repository.schema.Record;
 import org.apache.nifi.repository.schema.RecordSchema;
 import org.apache.nifi.repository.schema.SchemaRecordWriter;
-import org.apache.nifi.stream.io.DataOutputStream;
 
 public class ByteArraySchemaRecordWriter extends CompressableRecordWriter {
     private static final RecordSchema eventSchema = 
ProvenanceEventSchema.PROVENANCE_EVENT_SCHEMA_V1;
-    private static final RecordSchema contentClaimSchema = new 
RecordSchema(eventSchema.getField(EventRecordFields.Names.CONTENT_CLAIM).getSubFields());
+    private static final RecordSchema contentClaimSchema = new 
RecordSchema(eventSchema.getField(EventFieldNames.CONTENT_CLAIM).getSubFields());
     public static final int SERIALIZATION_VERSION = 1;
     public static final String SERIALIZATION_NAME = 
"ByteArraySchemaRecordWriter";
 
     private final SchemaRecordWriter recordWriter = new SchemaRecordWriter();
 
-    public ByteArraySchemaRecordWriter(final File file, final TocWriter 
tocWriter, final boolean compressed, final int uncompressedBlockSize) throws 
IOException {
-        super(file, tocWriter, compressed, uncompressedBlockSize);
+    public ByteArraySchemaRecordWriter(final File file, final AtomicLong 
idGenerator, final TocWriter tocWriter, final boolean compressed,
+        final int uncompressedBlockSize) throws IOException {
+        super(file, idGenerator, tocWriter, compressed, uncompressedBlockSize);
     }
 
-    public ByteArraySchemaRecordWriter(final OutputStream out, final TocWriter 
tocWriter, final boolean compressed, final int uncompressedBlockSize) throws 
IOException {
-        super(out, tocWriter, compressed, uncompressedBlockSize);
+    public ByteArraySchemaRecordWriter(final OutputStream out, final String 
storageLocation, final AtomicLong idGenerator, final TocWriter tocWriter, final 
boolean compressed,
+        final int uncompressedBlockSize) throws IOException {
+        super(out, storageLocation, idGenerator, tocWriter, compressed, 
uncompressedBlockSize);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/96ed405d/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java
 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java
new file mode 100644
index 0000000..612b6c8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/EventIdFirstSchemaRecordReader.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.provenance;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.nifi.provenance.schema.EventIdFirstHeaderSchema;
+import org.apache.nifi.provenance.schema.LookupTableEventRecord;
+import org.apache.nifi.provenance.serialization.CompressableRecordReader;
+import org.apache.nifi.provenance.toc.TocReader;
+import org.apache.nifi.repository.schema.Record;
+import org.apache.nifi.repository.schema.RecordSchema;
+import org.apache.nifi.repository.schema.SchemaRecordReader;
+import org.apache.nifi.stream.io.LimitingInputStream;
+import org.apache.nifi.stream.io.StreamUtils;
+
+public class EventIdFirstSchemaRecordReader extends CompressableRecordReader {
+    private RecordSchema schema; // effectively final
+    private SchemaRecordReader recordReader;  // effectively final
+
+    private List<String> componentIds;
+    private List<String> componentTypes;
+    private List<String> queueIds;
+    private List<String> eventTypes;
+    private long firstEventId;
+    private long systemTimeOffset;
+
+    public EventIdFirstSchemaRecordReader(final InputStream in, final String 
filename, final TocReader tocReader, final int maxAttributeChars) throws 
IOException {
+        super(in, filename, tocReader, maxAttributeChars);
+    }
+
+    private void verifySerializationVersion(final int serializationVersion) {
+        if (serializationVersion > 
EventIdFirstSchemaRecordWriter.SERIALIZATION_VERSION) {
+            throw new IllegalArgumentException("Unable to deserialize record 
because the version is " + serializationVersion
+                + " and supported versions are 1-" + 
EventIdFirstSchemaRecordWriter.SERIALIZATION_VERSION);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    protected synchronized void readHeader(final DataInputStream in, final int 
serializationVersion) throws IOException {
+        verifySerializationVersion(serializationVersion);
+        final int eventSchemaLength = in.readInt();
+        final byte[] buffer = new byte[eventSchemaLength];
+        StreamUtils.fillBuffer(in, buffer);
+
+        try (final ByteArrayInputStream bais = new 
ByteArrayInputStream(buffer)) {
+            schema = RecordSchema.readFrom(bais);
+        }
+
+        recordReader = SchemaRecordReader.fromSchema(schema);
+
+        final int headerSchemaLength = in.readInt();
+        final byte[] headerSchemaBuffer = new byte[headerSchemaLength];
+        StreamUtils.fillBuffer(in, headerSchemaBuffer);
+
+        final RecordSchema headerSchema;
+        try (final ByteArrayInputStream bais = new 
ByteArrayInputStream(headerSchemaBuffer)) {
+            headerSchema = RecordSchema.readFrom(bais);
+        }
+
+        final SchemaRecordReader headerReader = 
SchemaRecordReader.fromSchema(headerSchema);
+        final Record headerRecord = headerReader.readRecord(in);
+        componentIds = (List<String>) 
headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.COMPONENT_IDS);
+        componentTypes = (List<String>) 
headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.COMPONENT_TYPES);
+        queueIds = (List<String>) 
headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.QUEUE_IDS);
+        eventTypes = (List<String>) 
headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.EVENT_TYPES);
+        firstEventId = (Long) 
headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.FIRST_EVENT_ID);
+        systemTimeOffset = (Long) 
headerRecord.getFieldValue(EventIdFirstHeaderSchema.FieldNames.TIMESTAMP_OFFSET);
+    }
+
+    @Override
+    protected StandardProvenanceEventRecord nextRecord(final DataInputStream 
in, final int serializationVersion) throws IOException {
+        verifySerializationVersion(serializationVersion);
+
+        final long byteOffset = getBytesConsumed();
+        final long eventId = in.readInt() + firstEventId;
+        final int recordLength = in.readInt();
+
+        return readRecord(in, eventId, byteOffset, recordLength);
+    }
+
+    private StandardProvenanceEventRecord readRecord(final DataInputStream in, 
final long eventId, final long startOffset, final int recordLength) throws 
IOException {
+        final InputStream limitedIn = new LimitingInputStream(in, 
recordLength);
+
+        final Record eventRecord = recordReader.readRecord(limitedIn);
+        if (eventRecord == null) {
+            return null;
+        }
+
+        final StandardProvenanceEventRecord deserializedEvent = 
LookupTableEventRecord.getEvent(eventRecord, getFilename(), startOffset, 
getMaxAttributeLength(),
+            firstEventId, systemTimeOffset, componentIds, componentTypes, 
queueIds, eventTypes);
+        deserializedEvent.setEventId(eventId);
+        return deserializedEvent;
+    }
+
+    private boolean isData(final InputStream in) throws IOException {
+        in.mark(1);
+        final int nextByte = in.read();
+        in.reset();
+
+        return nextByte > -1;
+    }
+
+    @Override
+    protected Optional<StandardProvenanceEventRecord> readToEvent(final long 
eventId, final DataInputStream dis, final int serializationVersion) throws 
IOException {
+        verifySerializationVersion(serializationVersion);
+
+        while (isData(dis)) {
+            final long startOffset = getBytesConsumed();
+            final long id = dis.readInt() + firstEventId;
+            final int recordLength = dis.readInt();
+
+            if (id >= eventId) {
+                final StandardProvenanceEventRecord event = readRecord(dis, 
id, startOffset, recordLength);
+                return Optional.ofNullable(event);
+            } else {
+                // This is not the record we want. Skip over it instead of 
deserializing it.
+                StreamUtils.skip(dis, recordLength);
+            }
+        }
+
+        return Optional.empty();
+    }
+}

Reply via email to