Repository: nifi
Updated Branches:
  refs/heads/0.x 24c0c1113 -> 25f816d32


NIFI-2551: Addressed a threading issue in the StandardResourceClaimManager and 
performed some refactoring so that we can ensure thread-safety across different 
components of the application, such as ProcessSession and WriteAheadFlowFile 
Repository when interacting with the StandardResourceClaimManager. Update 
DebugFlow to allow it to write to a FlowFile multiple times, which exposes the 
concurrency bug. Also avoided calling ContentRepository.remove() from 
ProcessSession whenever the content is no longer needed, as that can cause 
problems now that the Resource Claim is backing the content claim.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/25f816d3
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/25f816d3
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/25f816d3

Branch: refs/heads/0.x
Commit: 25f816d32ab466d2ac1ad96cdb95e7359d085884
Parents: 24c0c11
Author: Mark Payne <[email protected]>
Authored: Fri Aug 12 13:56:03 2016 -0400
Committer: Mark Payne <[email protected]>
Committed: Mon Aug 15 17:01:11 2016 -0400

----------------------------------------------------------------------
 .../controller/repository/RepositoryRecord.java |   8 +
 .../repository/claim/ResourceClaim.java         |  13 ++
 .../repository/claim/ResourceClaimManager.java  |   7 +
 .../apache/nifi/controller/FlowController.java  |  74 ++++-----
 .../nifi/controller/StandardFlowFileQueue.java  |   5 +
 .../repository/FileSystemRepository.java        | 160 ++++++++-----------
 .../repository/StandardProcessSession.java      |  20 +--
 .../repository/StandardRepositoryRecord.java    |  19 +++
 .../WriteAheadFlowFileRepository.java           |  29 ++--
 .../repository/claim/StandardResourceClaim.java |  43 +++--
 .../claim/StandardResourceClaimManager.java     |  90 +++++++----
 .../controller/TestFileSystemSwapManager.java   |   4 +
 .../repository/TestFileSystemRepository.java    |  14 +-
 .../repository/TestStandardProcessSession.java  |  26 +--
 .../TestWriteAheadFlowFileRepository.java       |   5 +-
 .../claim/TestStandardResourceClaimManager.java | 106 ++++++++++++
 .../src/test/resources/logback-test.xml         |   2 +
 .../nifi/processors/standard/DebugFlow.java     |  60 +++++--
 .../org.apache.nifi.processor.Processor         |   2 +-
 .../nifi/processors/standard/TestDebugFlow.java |  10 --
 20 files changed, 461 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
index 09202c0..3b27d95 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java
@@ -16,6 +16,8 @@
  */
 package org.apache.nifi.controller.repository;
 
+import java.util.List;
+
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 
@@ -79,4 +81,10 @@ public interface RepositoryRecord {
      * swapped out
      */
     String getSwapLocation();
+
+    /**
+     * @return a List of Content Claims that are "transient," meaning that 
they existed only for the
+     *         life of the Process Session in which they were created and 
should not be persisted.
+     */
+    List<ContentClaim> getTransientClaims();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
index 77e7f84..bb78896 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaim.java
@@ -51,4 +51,17 @@ public interface ResourceClaim extends 
Comparable<ResourceClaim> {
      */
     boolean isLossTolerant();
 
+    /**
+     * @return <code>true</code> if the Resource Claim may still be written 
to, <code>false</code> if the Resource Claim
+     *         will no longer be written to
+     */
+    boolean isWritable();
+
+    /**
+     * Indicates whether or not the Resource Claim is in use. A Resource Claim 
is said to be in use if either it is
+     * writable or at least one Content Claim still refers to the it
+     *
+     * @return <code>true</code> if the Resource Claim is in use, 
<code>false</code> otherwise
+     */
+    boolean isInUse();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
index 01f4c65..b430df0 100644
--- 
a/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
+++ 
b/nifi-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -132,4 +132,11 @@ public interface ResourceClaimManager {
      * about
      */
     void purge();
+
+    /**
+     * Freezes the Resource Claim so that it can no longer be written to
+     *
+     * @param claim the resource claim to freeze
+     */
+    void freeze(ResourceClaim claim);
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index d28075b..3eff44c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,7 +16,41 @@
  */
 package org.apache.nifi.controller;
 
-import com.sun.jersey.api.client.ClientHandlerException;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Path;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.admin.service.AuditService;
@@ -79,7 +113,6 @@ import 
org.apache.nifi.controller.repository.claim.ContentDirection;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.controller.scheduling.EventDrivenSchedulingAgent;
@@ -187,40 +220,7 @@ import 
org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Path;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
+import com.sun.jersey.api.client.ClientHandlerException;
 
 public class FlowController implements EventAccess, ControllerServiceProvider, 
ReportingTaskProvider, Heartbeater, QueueProvider {
 
@@ -3349,7 +3349,7 @@ public class FlowController implements EventAccess, 
ControllerServiceProvider, R
                     return null;
                 }
 
-                final StandardResourceClaim resourceClaim = new 
StandardResourceClaim(container, section, identifier, false);
+                final ResourceClaim resourceClaim = 
resourceClaimManager.newResourceClaim(container, section, identifier, false);
                 return new StandardContentClaim(resourceClaim, offset == null 
? 0L : offset.longValue());
             }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 0f3ffe0..631b936 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -1304,6 +1304,11 @@ public final class StandardFlowFileQueue implements 
FlowFileQueue {
             public String getSwapLocation() {
                 return null;
             }
+
+            @Override
+            public List<ContentClaim> getTransientClaims() {
+                return Collections.emptyList();
+            }
         };
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 210fcca..89fe95e 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -103,7 +103,7 @@ public class FileSystemRepository implements 
ContentRepository {
     // with creating and deleting too many files, as we had to delete 100's of 
thousands of files every 2 minutes
     // in order to avoid backpressure on session commits. With 1 MB as the 
target file size, 100's of thousands of
     // files would mean that we are writing gigabytes per second - quite a bit 
faster than any disks can handle now.
-    private final long maxAppendClaimLength = 1024L * 1024L;
+    static final int MAX_APPENDABLE_CLAIM_LENGTH = 1024 * 1024;
 
     // Queue for claims that are kept open for writing. Size of 100 is pretty 
arbitrary. Ideally, this will be at
     // least as large as the number of threads that will be updating the 
repository simultaneously but we don't want
@@ -112,7 +112,6 @@ public class FileSystemRepository implements 
ContentRepository {
     // the OutputStream that we can use for writing to the claim.
     private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new 
LinkedBlockingQueue<>(100);
     private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> 
writableClaimStreams = new ConcurrentHashMap<>(100);
-    private final Set<ResourceClaim> activeResourceClaims = 
Collections.synchronizedSet(new HashSet<ResourceClaim>());
 
     private final boolean archiveData;
     private final long maxArchiveMillis;
@@ -498,64 +497,53 @@ public class FileSystemRepository implements 
ContentRepository {
     public ContentClaim create(final boolean lossTolerant) throws IOException {
         ResourceClaim resourceClaim;
 
-        // We need to synchronize on this queue because the act of pulling 
something off
-        // the queue and incrementing the associated claimant count MUST be 
done atomically.
-        // This way, if the claimant count is decremented to 0, we can ensure 
that the
-        // claim is not then pulled from the queue and used as another thread 
is destroying/archiving
-        // the claim. The logic in the remove() method dictates that the 
underlying file can be
-        // deleted (or archived) only if the claimant count becomes <= 0 AND 
there is no other claim on
-        // the queue that references that file. As a result, we need to ensure 
that those two conditions
-        // can be evaluated atomically. In order for that to be the case, we 
need to also treat the
-        // removal of a claim from the queue and the incrementing of its 
claimant count as an atomic
-        // action to ensure that the comparison of those two conditions is 
atomic also. As a result,
-        // we will synchronize on the queue while performing those actions.
         final long resourceOffset;
-        synchronized (writableClaimQueue) {
-            final ClaimLengthPair pair = writableClaimQueue.poll();
-            if (pair == null) {
-                final long currentIndex = index.incrementAndGet();
-
-                String containerName = null;
-                boolean waitRequired = true;
-                ContainerState containerState = null;
-                for (long containerIndex = currentIndex; containerIndex < 
currentIndex + containers.size(); containerIndex++) {
-                    final long modulatedContainerIndex = containerIndex % 
containers.size();
-                    containerName = containerNames.get((int) 
modulatedContainerIndex);
-
-                    containerState = containerStateMap.get(containerName);
-                    if (!containerState.isWaitRequired()) {
-                        waitRequired = false;
-                        break;
-                    }
-                }
-
-                if (waitRequired) {
-                    containerState.waitForArchiveExpiration();
+        final ClaimLengthPair pair = writableClaimQueue.poll();
+        if (pair == null) {
+            final long currentIndex = index.incrementAndGet();
+
+            String containerName = null;
+            boolean waitRequired = true;
+            ContainerState containerState = null;
+            for (long containerIndex = currentIndex; containerIndex < 
currentIndex + containers.size(); containerIndex++) {
+                final long modulatedContainerIndex = containerIndex % 
containers.size();
+                containerName = containerNames.get((int) 
modulatedContainerIndex);
+
+                containerState = containerStateMap.get(containerName);
+                if (!containerState.isWaitRequired()) {
+                    waitRequired = false;
+                    break;
                 }
+            }
 
-                final long modulatedSectionIndex = currentIndex % 
SECTIONS_PER_CONTAINER;
-                final String section = String.valueOf(modulatedSectionIndex);
-                final String claimId = System.currentTimeMillis() + "-" + 
currentIndex;
-
-                resourceClaim = 
resourceClaimManager.newResourceClaim(containerName, section, claimId, 
lossTolerant);
-                resourceOffset = 0L;
-                LOG.debug("Creating new Resource Claim {}", resourceClaim);
-
-                // we always append because there may be another ContentClaim 
using the same resource claim.
-                // However, we know that we will never write to the same claim 
from two different threads
-                // at the same time because we will call create() to get the 
claim before we write to it,
-                // and when we call create(), it will remove it from the 
Queue, which means that no other
-                // thread will get the same Claim until we've finished writing 
to it.
-                final File file = getPath(resourceClaim).toFile();
-                ByteCountingOutputStream claimStream = new 
SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), 
file.length());
-                writableClaimStreams.put(resourceClaim, claimStream);
-            } else {
-                resourceClaim = pair.getClaim();
-                resourceOffset = pair.getLength();
-                LOG.debug("Reusing Resource Claim {}", resourceClaim);
+            if (waitRequired) {
+                containerState.waitForArchiveExpiration();
             }
 
-            resourceClaimManager.incrementClaimantCount(resourceClaim, true);
+            final long modulatedSectionIndex = currentIndex % 
SECTIONS_PER_CONTAINER;
+            final String section = String.valueOf(modulatedSectionIndex);
+            final String claimId = System.currentTimeMillis() + "-" + 
currentIndex;
+
+            resourceClaim = 
resourceClaimManager.newResourceClaim(containerName, section, claimId, 
lossTolerant);
+            resourceOffset = 0L;
+            LOG.debug("Creating new Resource Claim {}", resourceClaim);
+
+            // we always append because there may be another ContentClaim 
using the same resource claim.
+            // However, we know that we will never write to the same claim 
from two different threads
+            // at the same time because we will call create() to get the claim 
before we write to it,
+            // and when we call create(), it will remove it from the Queue, 
which means that no other
+            // thread will get the same Claim until we've finished writing to 
it.
+            final File file = getPath(resourceClaim).toFile();
+            ByteCountingOutputStream claimStream = new 
SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), 
file.length());
+            writableClaimStreams.put(resourceClaim, claimStream);
+
+            incrementClaimantCount(resourceClaim, true);
+        } else {
+            resourceClaim = pair.getClaim();
+            resourceOffset = pair.getLength();
+            LOG.debug("Reusing Resource Claim {}", resourceClaim);
+
+            incrementClaimantCount(resourceClaim, false);
         }
 
         final StandardContentClaim scc = new 
StandardContentClaim(resourceClaim, resourceOffset);
@@ -564,18 +552,24 @@ public class FileSystemRepository implements 
ContentRepository {
 
     @Override
     public int incrementClaimaintCount(final ContentClaim claim) {
-        if (claim == null) {
+        return incrementClaimantCount(claim == null ? null : 
claim.getResourceClaim(), false);
+    }
+
+    protected int incrementClaimantCount(final ResourceClaim resourceClaim, 
final boolean newClaim) {
+        if (resourceClaim == null) {
             return 0;
         }
 
-        return 
resourceClaimManager.incrementClaimantCount(claim.getResourceClaim());
+        return resourceClaimManager.incrementClaimantCount(resourceClaim, 
newClaim);
     }
 
+
     @Override
     public int getClaimantCount(final ContentClaim claim) {
         if (claim == null) {
             return 0;
         }
+
         return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
     }
 
@@ -585,8 +579,7 @@ public class FileSystemRepository implements 
ContentRepository {
             return 0;
         }
 
-        final int claimantCount = 
resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
-        return claimantCount;
+        return 
resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
     }
 
     @Override
@@ -603,23 +596,9 @@ public class FileSystemRepository implements 
ContentRepository {
             return false;
         }
 
-        // we synchronize on the queue here because if the claimant count is 0,
-        // we need to be able to remove any instance of that resource claim 
from the
-        // queue atomically (i.e., the checking of the claimant count plus 
removal from the queue
-        // must be atomic). The create() method also synchronizes on the queue 
whenever it
-        // polls from the queue and increments a claimant count in order to 
ensure that these
-        // two conditions can be checked atomically.
-        synchronized (writableClaimQueue) {
-            final int claimantCount = 
resourceClaimManager.getClaimantCount(claim);
-            if (claimantCount > 0) {
-                // if other content claims are claiming the same resource, we 
have nothing to destroy,
-                // so just consider the destruction successful.
-                return true;
-            }
-            if (activeResourceClaims.contains(claim) || 
writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
-                // If we have an open OutputStream for the claim, we will not 
destroy the claim.
-                return false;
-            }
+        // If the claim is still in use, we won't remove it.
+        if (claim.isInUse()) {
+            return false;
         }
 
         Path path = null;
@@ -630,6 +609,7 @@ public class FileSystemRepository implements 
ContentRepository {
 
         // Ensure that we have no writable claim streams for this resource 
claim
         final ByteCountingOutputStream bcos = 
writableClaimStreams.remove(claim);
+
         if (bcos != null) {
             try {
                 bcos.close();
@@ -842,6 +822,7 @@ public class FileSystemRepository implements 
ContentRepository {
         return write(claim, false);
     }
 
+
     private OutputStream write(final ContentClaim claim, final boolean append) 
throws IOException {
         if (claim == null) {
             throw new NullPointerException("ContentClaim cannot be null");
@@ -858,12 +839,9 @@ public class FileSystemRepository implements 
ContentRepository {
             throw new IllegalArgumentException("Cannot write to " + claim + " 
because it has already been written to.");
         }
 
-        final ResourceClaim resourceClaim = claim.getResourceClaim();
-
         ByteCountingOutputStream claimStream = 
writableClaimStreams.get(scc.getResourceClaim());
         final int initialLength = append ? (int) Math.max(0, scc.getLength()) 
: 0;
 
-        activeResourceClaims.add(resourceClaim);
         final ByteCountingOutputStream bcos = claimStream;
         final OutputStream out = new OutputStream() {
             private long bytesWritten = 0L;
@@ -938,7 +916,6 @@ public class FileSystemRepository implements 
ContentRepository {
             @Override
             public synchronized void close() throws IOException {
                 closed = true;
-                activeResourceClaims.remove(resourceClaim);
 
                 if (alwaysSync) {
                     ((FileOutputStream) 
bcos.getWrappedStream()).getFD().sync();
@@ -954,20 +931,15 @@ public class FileSystemRepository implements 
ContentRepository {
                 // is called. In this case, we don't have to actually close 
the file stream. Instead, we
                 // can just add it onto the queue and continue to use it for 
the next content claim.
                 final long resourceClaimLength = scc.getOffset() + 
scc.getLength();
-                if (recycle && resourceClaimLength < maxAppendClaimLength) {
-                    // we do not have to synchronize on the writable claim 
queue here because we
-                    // are only adding something to the queue. We must 
synchronize if we are
-                    // using a ResourceClaim from the queue and incrementing 
the claimant count on that resource
-                    // because those need to be done atomically, or if we are 
destroying a claim that is on
-                    // the queue because we need to ensure that the latter 
operation does not cause problems
-                    // with the former.
+                if (recycle && resourceClaimLength < 
MAX_APPENDABLE_CLAIM_LENGTH) {
                     final ClaimLengthPair pair = new 
ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
                     final boolean enqueued = writableClaimQueue.offer(pair);
 
                     if (enqueued) {
-                        LOG.debug("Claim length less than max; Adding {} back 
to writableClaimStreams", this);
+                        LOG.debug("Claim length less than max; Adding {} back 
to Writable Claim Queue", this);
                     } else {
                         writableClaimStreams.remove(scc.getResourceClaim());
+
                         bcos.close();
 
                         LOG.debug("Claim length less than max; Closing {} 
because could not add back to queue", this);
@@ -979,8 +951,12 @@ public class FileSystemRepository implements 
ContentRepository {
                     // we've reached the limit for this claim. Don't add it 
back to our queue.
                     // Instead, just remove it and move on.
 
+                    // Mark the claim as no longer being able to be written to
+                    resourceClaimManager.freeze(scc.getResourceClaim());
+
                     // ensure that the claim is no longer on the queue
                     writableClaimQueue.remove(new 
ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
+
                     bcos.close();
                     LOG.debug("Claim lenth >= max; Closing {}", this);
                     if (LOG.isTraceEnabled()) {
@@ -1109,11 +1085,8 @@ public class FileSystemRepository implements 
ContentRepository {
             return false;
         }
 
-        synchronized (writableClaimQueue) {
-            final int claimantCount = claim == null ? 0 : 
resourceClaimManager.getClaimantCount(claim);
-            if (claimantCount > 0 || writableClaimQueue.contains(new 
ClaimLengthPair(claim, null))) {
-                return false;
-            }
+        if (claim.isInUse()) {
+            return false;
         }
 
         // If the claim count is decremented to 0 (<= 0 as a 'defensive 
programming' strategy), ensure that
@@ -1121,6 +1094,7 @@ public class FileSystemRepository implements 
ContentRepository {
         // claimant count is removed without writing to the claim (or more 
specifically, without closing the
         // OutputStream that is returned when calling write() ).
         final OutputStream out = writableClaimStreams.remove(claim);
+
         if (out != null) {
             try {
                 out.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index f04bf25..dc3ce42 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2126,24 +2126,26 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final boolean contentModified = record.getWorkingClaim() != null && 
record.getWorkingClaim() != record.getOriginalClaim();
 
         // If the working claim is not the same as the original claim, we have 
modified the content of
-        // the FlowFile, and we need to remove the newly created file (the 
working claim). However, if
+        // the FlowFile, and we need to remove the newly created content (the 
working claim). However, if
         // they are the same, we cannot just remove the claim because 
record.getWorkingClaim() will return
         // the original claim if the record is "working" but the content has 
not been modified
         // (e.g., in the case of attributes only were updated)
+        //
         // In other words:
         // If we modify the attributes of a FlowFile, and then we call 
record.getWorkingClaim(), this will
         // return the same claim as record.getOriginalClaim(). So we cannot 
just remove the working claim because
         // that may decrement the original claim (because the 2 claims are the 
same), and that's NOT what we want to do
-        // because we will do that later, in the session.commit() and that 
would result in removing the original claim twice.
+        // because we will do that later, in the session.commit() and that 
would result in decrementing the count for
+        // the original claim twice.
         if (contentModified) {
-            // In this case, it's ok to go ahead and destroy the content 
because we know that the working claim is going to be
+            // In this case, it's ok to decrement the claimant count for the 
content because we know that the working claim is going to be
             // updated and the given working claim is referenced only by 
FlowFiles in this session (because it's the Working Claim).
-            // Therefore, if this is the only record that refers to that 
Content Claim, we can destroy the claim. This happens,
-            // for instance, if a Processor modifies the content of a FlowFile 
more than once before committing the session.
-            final int claimantCount = 
context.getContentRepository().decrementClaimantCount(record.getWorkingClaim());
-            if (claimantCount == 0) {
-                
context.getContentRepository().remove(record.getWorkingClaim());
-            }
+            // Therefore, we need to decrement the claimant count, and since 
the Working Claim is being changed, that means that
+            // the Working Claim is a transient claim (the content need not be 
persisted because no FlowFile refers to it). We cannot simply
+            // remove the content because there may be other FlowFiles that 
reference the same Resource Claim. Marking the Content Claim as
+            // transient, though, will result in the FlowFile Repository 
cleaning up as appropriate.
+            
context.getContentRepository().decrementClaimantCount(record.getWorkingClaim());
+            record.addTransientClaim(record.getWorkingClaim());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
index c5be81e..8aa1caf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
@@ -16,8 +16,10 @@
  */
 package org.apache.nifi.controller.repository;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -35,6 +37,7 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
     private String swapLocation;
     private final Map<String, String> updatedAttributes = new HashMap<>();
     private final Map<String, String> originalAttributes;
+    private List<ContentClaim> transientClaims;
 
     /**
      * Creates a new record which has no original claim or flow file - it is 
entirely new
@@ -199,4 +202,20 @@ public class StandardRepositoryRecord implements 
RepositoryRecord {
     public String toString() {
         return "StandardRepositoryRecord[UpdateType=" + getType() + ",Record=" 
+ getCurrent() + "]";
     }
+
+    @Override
+    public List<ContentClaim> getTransientClaims() {
+        return transientClaims == null ? Collections.<ContentClaim> 
emptyList() : Collections.unmodifiableList(transientClaims);
+    }
+
+    void addTransientClaim(final ContentClaim claim) {
+        if (claim == null) {
+            return;
+        }
+
+        if (transientClaims == null) {
+            transientClaims = new ArrayList<>();
+        }
+        transientClaims.add(claim);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 211baa7..27d6c9b 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -177,17 +177,17 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         claimManager.markDestructable(resourceClaim);
     }
 
-    private int getClaimantCount(final ContentClaim claim) {
+    private boolean isDestructable(final ContentClaim claim) {
         if (claim == null) {
-            return 0;
+            return false;
         }
 
         final ResourceClaim resourceClaim = claim.getResourceClaim();
         if (resourceClaim == null) {
-            return 0;
+            return false;
         }
 
-        return claimManager.getClaimantCount(resourceClaim);
+        return !resourceClaim.isInUse();
     }
 
     private void updateRepository(final Collection<RepositoryRecord> records, 
final boolean sync) throws IOException {
@@ -211,21 +211,30 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         final Set<ResourceClaim> claimsToAdd = new HashSet<>();
         for (final RepositoryRecord record : records) {
             if (record.getType() == RepositoryRecordType.DELETE) {
-                // For any DELETE record that we have, if current claim's 
claimant count <= 0, mark it as destructable
-                if (record.getCurrentClaim() != null && 
getClaimantCount(record.getCurrentClaim()) <= 0) {
+                // For any DELETE record that we have, if claim is 
destructible, mark it so
+                if (record.getCurrentClaim() != null && 
isDestructable(record.getCurrentClaim())) {
                     
claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
                 }
 
-                // If the original claim is different than the current claim 
and the original claim has a claimant count <= 0, mark it as destructable.
-                if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getCurrentClaim()) && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
+                // If the original claim is different than the current claim 
and the original claim is destructible, mark it so
+                if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getCurrentClaim()) && 
isDestructable(record.getOriginalClaim())) {
                     
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                 }
             } else if (record.getType() == RepositoryRecordType.UPDATE) {
-                // if we have an update, and the original is no longer needed, 
mark original as destructable
-                if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
getClaimantCount(record.getOriginalClaim()) <= 0) {
+                // if we have an update, and the original is no longer needed, 
mark original as destructible
+                if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
isDestructable(record.getOriginalClaim())) {
                     
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
                 }
             }
+
+            final List<ContentClaim> transientClaims = 
record.getTransientClaims();
+            if (transientClaims != null) {
+                for (final ContentClaim transientClaim : transientClaims) {
+                    if (isDestructable(transientClaim)) {
+                        claimsToAdd.add(transientClaim.getResourceClaim());
+                    }
+                }
+            }
         }
 
         if (!claimsToAdd.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
index 4b27eae..25dbaee 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaim.java
@@ -16,17 +16,17 @@
  */
 package org.apache.nifi.controller.repository.claim;
 
-import java.util.concurrent.atomic.AtomicInteger;
-
 public class StandardResourceClaim implements ResourceClaim, 
Comparable<ResourceClaim> {
+    private final StandardResourceClaimManager claimManager;
     private final String id;
     private final String container;
     private final String section;
     private final boolean lossTolerant;
-    private final AtomicInteger claimantCount = new AtomicInteger(0);
     private final int hashCode;
+    private volatile boolean writable = true;
 
-    public StandardResourceClaim(final String container, final String section, 
final String id, final boolean lossTolerant) {
+    public StandardResourceClaim(final StandardResourceClaimManager 
claimManager, final String container, final String section, final String id, 
final boolean lossTolerant) {
+        this.claimManager = claimManager;
         this.container = container.intern();
         this.section = section.intern();
         this.id = id;
@@ -64,18 +64,6 @@ public class StandardResourceClaim implements ResourceClaim, 
Comparable<Resource
         return section;
     }
 
-    int getClaimantCount() {
-        return claimantCount.get();
-    }
-
-    int decrementClaimantCount() {
-        return claimantCount.decrementAndGet();
-    }
-
-    int incrementClaimantCount() {
-        return claimantCount.incrementAndGet();
-    }
-
     /**
      * Provides the natural ordering for ResourceClaim objects. By default 
they are sorted by their id, then container, then section
      *
@@ -131,4 +119,27 @@ public class StandardResourceClaim implements 
ResourceClaim, Comparable<Resource
         return "StandardResourceClaim[id=" + id + ", container=" + container + 
", section=" + section + "]";
     }
 
+    @Override
+    public boolean isWritable() {
+        return writable;
+    }
+
+    /**
+     * Freeze the Resource Claim so that it can now longer be written to
+     */
+    void freeze() {
+        this.writable = false;
+    }
+
+    @Override
+    public boolean isInUse() {
+        // Note that it is critical here that we always check isWritable() 
BEFORE checking
+        // the claimant count. This is due to the fact that if the claim is in 
fact writable, the claimant count
+        // could increase. So if we first check claimant count and that is 0, 
and then we check isWritable, it may be
+        // that the claimant count has changed to 1 before checking isWritable.
+        // However, if isWritable() is false, then the only way that the 
claimant count can increase is if a FlowFile referencing
+        // the Resource Claim is cloned. In this case, though, the claimant 
count has not become 0.
+        // Said another way, if isWritable() == false, then the claimant count 
can never increase from 0.
+        return isWritable() || claimManager.getClaimantCount(this) > 0;
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 4826ac3..9cb0fa1 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -36,7 +36,7 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
 
     @Override
     public ResourceClaim newResourceClaim(final String container, final String 
section, final String id, final boolean lossTolerant) {
-        return new StandardResourceClaim(container, section, id, lossTolerant);
+        return new StandardResourceClaim(this, container, section, id, 
lossTolerant);
     }
 
     private static AtomicInteger getCounter(final ResourceClaim claim) {
@@ -59,8 +59,11 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
         if (claim == null) {
             return 0;
         }
-        final AtomicInteger counter = claimantCounts.get(claim);
-        return counter == null ? 0 : counter.get();
+
+        synchronized (claim) {
+            final AtomicInteger counter = claimantCounts.get(claim);
+            return counter == null ? 0 : counter.get();
+        }
     }
 
     @Override
@@ -69,18 +72,30 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
             return 0;
         }
 
-        final AtomicInteger counter = claimantCounts.get(claim);
-        if (counter == null) {
-            logger.debug("Decrementing claimant count for {} but claimant 
count is not known. Returning -1", claim);
-            return -1;
-        }
+        synchronized (claim) {
+            final AtomicInteger counter = claimantCounts.get(claim);
+            if (counter == null) {
+                logger.warn("Decrementing claimant count for {} but claimant 
count is not known. Returning -1", claim);
+                return -1;
+            }
 
-        final int newClaimantCount = counter.decrementAndGet();
-        logger.debug("Decrementing claimant count for {} to {}", claim, 
newClaimantCount);
-        if (newClaimantCount == 0) {
-            claimantCounts.remove(claim);
+            final int newClaimantCount = counter.decrementAndGet();
+            if (newClaimantCount < 0) {
+                logger.error("Decremented claimant count for {} to {}", claim, 
newClaimantCount);
+            } else {
+                logger.debug("Decrementing claimant count for {} to {}", 
claim, newClaimantCount);
+            }
+
+            if (newClaimantCount == 0) {
+                removeClaimantCount(claim);
+            }
+            return newClaimantCount;
         }
-        return newClaimantCount;
+    }
+
+    // protected so that it can be used in unit tests
+    protected void removeClaimantCount(final ResourceClaim claim) {
+        claimantCounts.remove(claim);
     }
 
     @Override
@@ -90,15 +105,22 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
 
     @Override
     public int incrementClaimantCount(final ResourceClaim claim, final boolean 
newClaim) {
-        final AtomicInteger counter = getCounter(claim);
+        if (claim == null) {
+            return 0;
+        }
+
+        synchronized (claim) {
+            final AtomicInteger counter = getCounter(claim);
+
+            final int newClaimantCount = counter.incrementAndGet();
+            logger.debug("Incrementing claimant count for {} to {}", claim, 
newClaimantCount);
 
-        final int newClaimantCount = counter.incrementAndGet();
-        logger.debug("Incrementing claimant count for {} to {}", claim, 
newClaimantCount);
-        // If the claimant count moved from 0 to 1, remove it from the queue 
of destructable claims.
-        if (!newClaim && newClaimantCount == 1) {
-            destructableClaims.remove(claim);
+            // If the claimant count moved from 0 to 1, remove it from the 
queue of destructable claims.
+            if (!newClaim && newClaimantCount == 1) {
+                destructableClaims.remove(claim);
+            }
+            return newClaimantCount;
         }
-        return newClaimantCount;
     }
 
     @Override
@@ -107,15 +129,17 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
             return;
         }
 
-        if (getClaimantCount(claim) > 0) {
-            return;
-        }
+        synchronized (claim) {
+            if (getClaimantCount(claim) > 0) {
+                return;
+            }
 
-        logger.debug("Marking claim {} as destructable", claim);
-        try {
-            while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) {
+            logger.debug("Marking claim {} as destructable", claim);
+            try {
+                while (!destructableClaims.offer(claim, 30, TimeUnit.MINUTES)) 
{
+                }
+            } catch (final InterruptedException ie) {
             }
-        } catch (final InterruptedException ie) {
         }
     }
 
@@ -142,4 +166,16 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
         claimantCounts.clear();
     }
 
+    @Override
+    public void freeze(final ResourceClaim claim) {
+        if (claim == null) {
+            return;
+        }
+
+        if (!(claim instanceof StandardResourceClaim)) {
+            throw new IllegalArgumentException("The given resource claim is 
not managed by this Resource Claim Manager");
+        }
+
+        ((StandardResourceClaim) claim).freeze();
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index fcfd524..2ab8e35 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -159,6 +159,10 @@ public class TestFileSystemSwapManager {
         @Override
         public void purge() {
         }
+
+        @Override
+        public void freeze(ResourceClaim claim) {
+        }
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index c00b91a..6adcc05 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -257,8 +257,8 @@ public class TestFileSystemRepository {
         final Path claimPath = getPath(claim);
 
         // Create the file.
-        try (final OutputStream out = Files.newOutputStream(claimPath, 
StandardOpenOption.CREATE)) {
-            out.write("Hello".getBytes());
+        try (final OutputStream out = repository.write(claim)) {
+            out.write(new 
byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH]);
         }
 
         int count = repository.decrementClaimantCount(claim);
@@ -379,13 +379,13 @@ public class TestFileSystemRepository {
 
     @Test(expected = ContentNotFoundException.class)
     public void testSizeWithNoContent() throws IOException {
-        final ContentClaim claim = new StandardContentClaim(new 
StandardResourceClaim("container1", "section 1", "1", false), 0L);
+        final ContentClaim claim = new StandardContentClaim(new 
StandardResourceClaim(claimManager, "container1", "section 1", "1", false), 0L);
         assertEquals(0L, repository.size(claim));
     }
 
     @Test(expected = ContentNotFoundException.class)
     public void testReadWithNoContent() throws IOException {
-        final ContentClaim claim = new StandardContentClaim(new 
StandardResourceClaim("container1", "section 1", "1", false), 0L);
+        final ContentClaim claim = new StandardContentClaim(new 
StandardResourceClaim(claimManager, "container1", "section 1", "1", false), 0L);
         final InputStream in = repository.read(claim);
         in.close();
     }
@@ -427,12 +427,12 @@ public class TestFileSystemRepository {
 
         // write at least 1 MB to the output stream so that when we close the 
output stream
         // the repo won't keep the stream open.
-        final byte[] buff = new byte[1024 * 1024];
+        final byte[] buff = new 
byte[FileSystemRepository.MAX_APPENDABLE_CLAIM_LENGTH];
         out.write(buff);
         out.write(buff);
 
-        // true because claimant count is still 1.
-        assertTrue(repository.remove(claim));
+        // false because claimant count is still 1, so the resource claim was 
not removed
+        assertFalse(repository.remove(claim));
 
         assertEquals(0, repository.decrementClaimantCount(claim));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 56329ec..0e39bde 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -20,10 +20,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.doThrow;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.notNull;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -62,7 +62,6 @@ import 
org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -97,6 +96,7 @@ public class TestStandardProcessSession {
 
     private ProvenanceEventRepository provenanceRepo;
     private MockFlowFileRepository flowFileRepo;
+    private static StandardResourceClaimManager resourceClaimManager;
 
     @After
     public void cleanup() {
@@ -134,6 +134,8 @@ public class TestStandardProcessSession {
     @Before
     @SuppressWarnings("unchecked")
     public void setup() throws IOException {
+        resourceClaimManager = new StandardResourceClaimManager();
+
         System.setProperty("nifi.properties.file.path", 
"src/test/resources/nifi.properties");
         final FlowFileEventRepository flowFileEventRepo = 
Mockito.mock(FlowFileEventRepository.class);
         final CounterRepository counterRepo = 
Mockito.mock(CounterRepository.class);
@@ -743,7 +745,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", 
true), 0L))
             .size(1L)
             .build();
         flowFileQueue.put(flowFileRecord);
@@ -840,7 +842,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", 
true), 0L))
             .size(1L)
             .build();
         flowFileQueue.put(flowFileRecord);
@@ -864,7 +866,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", 
true), 0L))
             .build();
         flowFileQueue.put(flowFileRecord);
 
@@ -880,7 +882,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", 
true), 0L))
             .contentClaimOffset(1000L)
             .size(1000L)
             .build();
@@ -905,7 +907,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", 
true), 0L))
             .build();
 
         flowFileQueue.put(flowFileRecord);
@@ -922,7 +924,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord2 = new 
StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", 
true), 0L))
 
         .contentClaimOffset(1000L).size(1L).build();
         flowFileQueue.put(flowFileRecord2);
@@ -991,7 +993,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", 
true), 0L))
 
         .contentClaimOffset(0L).size(0L).build();
         flowFileQueue.put(flowFileRecord);
@@ -1029,7 +1031,7 @@ public class TestStandardProcessSession {
         final FlowFileRecord flowFileRecord = new 
StandardFlowFileRecord.Builder()
             .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
             .entryDate(System.currentTimeMillis())
-            .contentClaim(new StandardContentClaim(new 
StandardResourceClaim("x", "x", "0", true), 0L))
+            .contentClaim(new 
StandardContentClaim(resourceClaimManager.newResourceClaim("x", "x", "0", 
true), 0L))
 
         .contentClaimOffset(0L).size(0L).build();
         flowFileQueue.put(flowFileRecord);
@@ -1258,7 +1260,7 @@ public class TestStandardProcessSession {
             final Set<ContentClaim> claims = new HashSet<>();
 
             for (long i = 0; i < idGenerator.get(); i++) {
-                final ResourceClaim resourceClaim = new 
StandardResourceClaim("container", "section", String.valueOf(i), false);
+                final ResourceClaim resourceClaim = 
resourceClaimManager.newResourceClaim("container", "section", 
String.valueOf(i), false);
                 final ContentClaim contentClaim = new 
StandardContentClaim(resourceClaim, 0L);
                 if (getClaimantCount(contentClaim) > 0) {
                     claims.add(contentClaim);

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index cd4aa27..ee26d1f 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -44,7 +44,6 @@ import 
org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
-import org.apache.nifi.controller.repository.claim.StandardResourceClaim;
 import 
org.apache.nifi.controller.repository.claim.StandardResourceClaimManager;
 import org.apache.nifi.controller.swap.StandardSwapContents;
 import org.apache.nifi.controller.swap.StandardSwapSummary;
@@ -87,10 +86,10 @@ public class TestWriteAheadFlowFileRepository {
         when(connection.getFlowFileQueue()).thenReturn(queue);
         queueProvider.addConnection(connection);
 
-        final ResourceClaim resourceClaim1 = new 
StandardResourceClaim("container", "section", "1", false);
+        final ResourceClaim resourceClaim1 = 
claimManager.newResourceClaim("container", "section", "1", false);
         final ContentClaim claim1 = new StandardContentClaim(resourceClaim1, 
0L);
 
-        final ResourceClaim resourceClaim2 = new 
StandardResourceClaim("container", "section", "2", false);
+        final ResourceClaim resourceClaim2 = 
claimManager.newResourceClaim("container", "section", "2", false);
         final ContentClaim claim2 = new StandardContentClaim(resourceClaim2, 
0L);
 
         // Create a flowfile repo, update it once with a FlowFile that points 
to one resource claim. Then,

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
new file mode 100644
index 0000000..d29105a
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.claim;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class TestStandardResourceClaimManager {
+
+    @Test
+    @Ignore("Unit test was created to repeat a concurrency bug in 
StandardResourceClaimManager. "
+        + "However, now that the concurrency bug has been fixed, the test will 
deadlock. Leaving here for now in case it's valuable before the commit is 
pushed")
+    public void testIncrementAndDecrementThreadSafety() throws 
InterruptedException {
+        final AtomicBoolean waitToRemove = new AtomicBoolean(true);
+        final CountDownLatch decrementComplete = new CountDownLatch(1);
+
+        final StandardResourceClaimManager manager = new 
StandardResourceClaimManager() {
+            @Override
+            protected void removeClaimantCount(final ResourceClaim claim) {
+                decrementComplete.countDown();
+
+                while (waitToRemove.get()) {
+                    try {
+                        Thread.sleep(10L);
+                    } catch (final InterruptedException ie) {
+                        Assert.fail("Interrupted while waiting to remove 
claimant count");
+                    }
+                }
+
+                super.removeClaimantCount(claim);
+            }
+        };
+
+        final ResourceClaim resourceClaim = 
manager.newResourceClaim("container", "section", "id", false);
+        assertEquals(1, manager.incrementClaimantCount(resourceClaim)); // 
increment claimant count to 1.
+
+        assertEquals(1, manager.getClaimantCount(resourceClaim));
+
+        // Decrement the claimant count. This should decrement the count to 0. 
However, we have 'waitToRemove' set to true,
+        // so the manager will not actually remove the claimant count (or 
return from this method) until we set 'waitToRemove'
+        // to false. We do this so that we can increment the claimant count in 
a separate thread. Because we will be incrementing
+        // the count in 1 thread and decrementing it in another thread, the 
end result should be that the claimant count is still
+        // at 1.
+        final Runnable decrementCountRunnable = new Runnable() {
+            @Override
+            public void run() {
+                manager.decrementClaimantCount(resourceClaim);
+            }
+        };
+
+        final Runnable incrementCountRunnable = new Runnable() {
+            @Override
+            public void run() {
+                // Wait until the count has been decremented
+                try {
+                    decrementComplete.await();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    Assert.fail(e.toString());
+                }
+
+                // Increment the claimant count
+                manager.incrementClaimantCount(resourceClaim);
+
+                // allow the 'decrement Thread' to complete
+                waitToRemove.set(false);
+            }
+        };
+
+        // Start the threads so that the claim count is incremented and 
decremented at the same time
+        final Thread decrementThread = new Thread(decrementCountRunnable);
+        final Thread incrementThread = new Thread(incrementCountRunnable);
+
+        decrementThread.start();
+        incrementThread.start();
+
+        // Wait for both threads to complete
+        incrementThread.join();
+        decrementThread.join();
+
+        // claimant count should still be 1, since 1 thread incremented it and 
1 thread decremented it!
+        assertEquals(1, manager.getClaimantCount(resourceClaim));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
index 6bbe800..09cc037 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/resources/logback-test.xml
@@ -33,4 +33,6 @@
     <root level="INFO">
         <appender-ref ref="CONSOLE"/>
     </root>
+    
+    <logger name="StandardProcessSession.claims" level="DEBUG" />
 </configuration>

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
index 1374c10..cd5ce54 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DebugFlow.java
@@ -16,8 +16,22 @@
  */
 package org.apache.nifi.processors.standard;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
 import org.apache.http.annotation.ThreadSafe;
 import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -25,24 +39,15 @@ import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
 @ThreadSafe()
 @EventDriven()
 @Tags({"test", "debug", "processor", "utility", "flow", "FlowFile"})
@@ -167,6 +172,20 @@ public class DebugFlow extends AbstractProcessor {
                 }
             })
             .build();
+    static final PropertyDescriptor WRITE_ITERATIONS = new 
PropertyDescriptor.Builder()
+        .name("Write Iterations")
+        .description("Number of times to write to the FlowFile")
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .required(true)
+        .defaultValue("0")
+        .build();
+    static final PropertyDescriptor CONTENT_SIZE = new 
PropertyDescriptor.Builder()
+        .name("Content Size")
+        .description("The number of bytes to write each time that the FlowFile 
is written to")
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .required(true)
+        .defaultValue("1 KB")
+        .build();
 
     private volatile Integer flowFileMaxSuccess = 0;
     private volatile Integer flowFileMaxFailure = 0;
@@ -225,6 +244,8 @@ public class DebugFlow extends AbstractProcessor {
                 propList.add(NO_FF_EXCEPTION_ITERATIONS);
                 propList.add(NO_FF_YIELD_ITERATIONS);
                 propList.add(NO_FF_EXCEPTION_CLASS);
+                propList.add(WRITE_ITERATIONS);
+                propList.add(CONTENT_SIZE);
                 propertyDescriptors.compareAndSet(null, 
Collections.unmodifiableList(propList));
             }
             return propertyDescriptors.get();
@@ -304,6 +325,23 @@ public class DebugFlow extends AbstractProcessor {
                 }
                 return;
             } else {
+                final int writeIterations = 
context.getProperty(WRITE_ITERATIONS).asInteger();
+                if (writeIterations > 0 && pass == 1) {
+                    final Random random = new Random();
+
+                    for (int i = 0; i < writeIterations; i++) {
+                        final byte[] data = new 
byte[context.getProperty(CONTENT_SIZE).asDataSize(DataUnit.B).intValue()];
+                        random.nextBytes(data);
+
+                        ff = session.write(ff, new OutputStreamCallback() {
+                            @Override
+                            public void process(final OutputStream out) throws 
IOException {
+                                out.write(data);
+                            }
+                        });
+                    }
+                }
+
                 if (curr_ff_resp.state() == 
FlowFileResponseState.FF_SUCCESS_RESPONSE) {
                     if (flowFileCurrSuccess < flowFileMaxSuccess) {
                         flowFileCurrSuccess += 1;

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 94cd7da..62b3b13 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -87,4 +87,4 @@ org.apache.nifi.processors.standard.TransformXml
 org.apache.nifi.processors.standard.UnpackContent
 org.apache.nifi.processors.standard.ValidateXml
 org.apache.nifi.processors.standard.ExecuteSQL
-org.apache.nifi.processors.standard.FetchDistributedMapCache
+org.apache.nifi.processors.standard.FetchDistributedMapCache
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/25f816d3/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java
index 8eb53aa..5aa2e1e 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDebugFlow.java
@@ -76,16 +76,6 @@ public class TestDebugFlow {
         runner.setProperty(DebugFlow.NO_FF_YIELD_ITERATIONS, "0");
     }
 
-    @Test
-    public void testGetSupportedPropertyDescriptors() throws Exception {
-        assertEquals(11, debugFlow.getPropertyDescriptors().size());
-    }
-
-    @Test
-    public void testGetRelationships() throws Exception {
-        assertEquals(2, debugFlow.getRelationships().size());
-    }
-
     private boolean isInContents(byte[] content) {
         for (Map.Entry entry : contents.entrySet()) {
             if (((String)entry.getValue()).compareTo(new String(content)) == 
0) {

Reply via email to