Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 50744bfdc -> 83b33c805


NIFI-387: If possible don't use ContentRepository.importFrom but just copy 
stream directly in StandardProcessSession


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/739f0c25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/739f0c25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/739f0c25

Branch: refs/heads/develop
Commit: 739f0c25e2c885bcfa6eef81a9e4896e618ec0fb
Parents: 50744bf
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Feb 26 09:04:05 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Thu Feb 26 09:04:05 2015 -0500

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 46 +++++++++++---------
 1 file changed, 26 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739f0c25/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index dcb461c..8d2e456 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -2351,35 +2351,41 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     public FlowFile importFrom(final InputStream source, final FlowFile 
destination) {
         validateRecordState(destination);
         final StandardRepositoryRecord record = records.get(destination);
-        final ContentClaim newClaim;
+        ContentClaim newClaim = null;
         long claimOffset = 0L;
 
-        final boolean appendToClaim = isMergeContent();
-        if (appendToClaim) {
-            enforceCurrentWriteClaimState();
-            newClaim = currentWriteClaim;
-            claimOffset = currentWriteClaimSize;
-        } else {
-            try {
-                newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
-                claimLog.debug("Creating ContentClaim {} for 'importFrom' for 
{}", newClaim, destination);
-            } catch (final IOException e) {
-                throw new FlowFileAccessException("Unable to create 
ContentClaim due to " + e.toString(), e);
-            }
-        }
-
         final long newSize;
+        final boolean appendToClaim = isMergeContent();
         try {
-            final boolean append = isMergeContent();
-            newSize = context.getContentRepository().importFrom(source, 
newClaim, append);
-            bytesWritten.increment(newSize);
-            currentWriteClaimSize += newSize;
+            if (appendToClaim) {
+                enforceCurrentWriteClaimState();
+                newClaim = currentWriteClaim;
+                claimOffset = currentWriteClaimSize;
+                
+                final long bytesCopied = StreamUtils.copy(source, 
currentWriteClaimStream);
+                bytesWritten.increment(bytesCopied);
+                currentWriteClaimSize += bytesCopied;
+                newSize = bytesCopied;
+            } else {
+                try {
+                    newClaim = 
context.getContentRepository().create(context.getConnectable().isLossTolerant());
+                    claimLog.debug("Creating ContentClaim {} for 'importFrom' 
for {}", newClaim, destination);
+                    
+                    newSize = 
context.getContentRepository().importFrom(source, newClaim, appendToClaim);
+                    bytesWritten.increment(newSize);
+                    currentWriteClaimSize += newSize;
+                } catch (final IOException e) {
+                    throw new FlowFileAccessException("Unable to create 
ContentClaim due to " + e.toString(), e);
+                }
+            }
         } catch (final Throwable t) {
             if (appendToClaim) {
                 resetWriteClaims();
             }
 
-            destroyContent(newClaim);
+            if ( newClaim != null ) {
+                destroyContent(newClaim);
+            }
             throw new FlowFileAccessException("Failed to import data from " + 
source + " for " + destination + " due to " + t.toString(), t);
         }
 

Reply via email to