Repository: incubator-nifi Updated Branches: refs/heads/develop 50744bfdc -> 83b33c805
NIFI-387: If possible don't use ContentRepository.importFrom but just copy stream directly in StandardProcessSession Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/739f0c25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/739f0c25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/739f0c25 Branch: refs/heads/develop Commit: 739f0c25e2c885bcfa6eef81a9e4896e618ec0fb Parents: 50744bf Author: Mark Payne <marka...@hotmail.com> Authored: Thu Feb 26 09:04:05 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Thu Feb 26 09:04:05 2015 -0500 ---------------------------------------------------------------------- .../repository/StandardProcessSession.java | 46 +++++++++++--------- 1 file changed, 26 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/739f0c25/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index dcb461c..8d2e456 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -2351,35 +2351,41 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE public FlowFile importFrom(final InputStream source, final FlowFile destination) { validateRecordState(destination); final StandardRepositoryRecord record = records.get(destination); - final ContentClaim newClaim; + ContentClaim newClaim = null; long claimOffset = 0L; - final boolean appendToClaim = isMergeContent(); - if (appendToClaim) { - enforceCurrentWriteClaimState(); - newClaim = currentWriteClaim; - claimOffset = currentWriteClaimSize; - } else { - try { - newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); - claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); - } catch (final IOException e) { - throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e); - } - } - final long newSize; + final boolean appendToClaim = isMergeContent(); try { - final boolean append = isMergeContent(); - newSize = context.getContentRepository().importFrom(source, newClaim, append); - bytesWritten.increment(newSize); - currentWriteClaimSize += newSize; + if (appendToClaim) { + enforceCurrentWriteClaimState(); + newClaim = currentWriteClaim; + claimOffset = currentWriteClaimSize; + + final long bytesCopied = StreamUtils.copy(source, currentWriteClaimStream); + bytesWritten.increment(bytesCopied); + currentWriteClaimSize += bytesCopied; + newSize = bytesCopied; + } else { + try { + newClaim = context.getContentRepository().create(context.getConnectable().isLossTolerant()); + claimLog.debug("Creating ContentClaim {} for 'importFrom' for {}", newClaim, destination); + + newSize = context.getContentRepository().importFrom(source, newClaim, appendToClaim); + bytesWritten.increment(newSize); + currentWriteClaimSize += newSize; + } catch (final IOException e) { + throw new FlowFileAccessException("Unable to create ContentClaim due to " + e.toString(), e); + } + } } catch (final Throwable t) { if (appendToClaim) { resetWriteClaims(); } - destroyContent(newClaim); + if ( newClaim != null ) { + destroyContent(newClaim); + } throw new FlowFileAccessException("Failed to import data from " + source + " for " + destination + " due to " + t.toString(), t); }