Repository: nifi Updated Branches: refs/heads/0.x 25f816d32 -> 7c7183100
NIFI-2376: Ensure that we don't decrement claimant count more than once when append() throws an Exception NIFI-2397 merging patch for NIFI-2376 into 0.x branch This closes #844. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7c718310 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7c718310 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7c718310 Branch: refs/heads/0.x Commit: 7c718310085bb960918536f57ab25b54c8ce0bc0 Parents: 25f816d Author: Mark Payne <[email protected]> Authored: Sat Jul 23 15:05:20 2016 -0400 Committer: Pierre Villard <[email protected]> Committed: Tue Aug 16 13:51:26 2016 +0200 ---------------------------------------------------------------------- .../nifi/controller/StandardFlowFileQueue.java | 2 +- .../repository/FileSystemRepository.java | 3 +- .../repository/StandardProcessSession.java | 53 ++++++-- .../repository/TestFileSystemRepository.java | 49 +++++++ .../repository/TestStandardProcessSession.java | 135 +++++++++++++++++++ 5 files changed, 227 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/7c718310/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java index 631b936..1fd65c0 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java @@ -79,7 +79,7 @@ import org.slf4j.LoggerFactory; * processing. Must be thread safe. * */ -public final class StandardFlowFileQueue implements FlowFileQueue { +public class StandardFlowFileQueue implements FlowFileQueue { public static final int MAX_EXPIRED_RECORDS_PER_ITERATION = 100000; public static final int SWAP_RECORD_POLL_SIZE = 10000; http://git-wip-us.apache.org/repos/asf/nifi/blob/7c718310/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java index 89fe95e..8a3ac6d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java @@ -1080,7 +1080,8 @@ public class FileSystemRepository implements ContentRepository { return Files.exists(getArchivePath(contentClaim.getResourceClaim())); } - private boolean archive(final ResourceClaim claim) throws IOException { + // visible for testing + boolean archive(final ResourceClaim claim) throws IOException { if (!archiveData) { return false; } http://git-wip-us.apache.org/repos/asf/nifi/blob/7c718310/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java index dc3ce42..21ffa04 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java @@ -315,13 +315,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE if (record.isMarkedForDelete()) { // if the working claim is not the same as the original claim, we can immediately destroy the working claim // because it was created in this session and is to be deleted. We don't need to wait for the FlowFile Repo to sync. - removeContent(record.getWorkingClaim()); + decrementClaimCount(record.getWorkingClaim()); if (record.getOriginalClaim() != null && !record.getOriginalClaim().equals(record.getWorkingClaim())) { // if working & original claim are same, don't remove twice; we only want to remove the original // if it's different from the working. Otherwise, we remove two claimant counts. This causes // an issue if we only updated the FlowFile attributes. - removeContent(record.getOriginalClaim()); + decrementClaimCount(record.getOriginalClaim()); } final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); final Connectable connectable = context.getConnectable(); @@ -329,7 +329,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE LOG.info("{} terminated by {}; life of FlowFile = {} ms", new Object[] {flowFile, terminator, flowFileLife}); } else if (record.isWorking() && record.getWorkingClaim() != record.getOriginalClaim()) { // records which have been updated - remove original if exists - removeContent(record.getOriginalClaim()); + decrementClaimCount(record.getOriginalClaim()); } } @@ -897,12 +897,12 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE final Set<StandardRepositoryRecord> transferRecords = new HashSet<>(); for (final StandardRepositoryRecord record : recordsToHandle) { if (record.isMarkedForAbort()) { - removeContent(record.getWorkingClaim()); + decrementClaimCount(record.getWorkingClaim()); if (record.getCurrentClaim() != null && !record.getCurrentClaim().equals(record.getWorkingClaim())) { // if working & original claim are same, don't remove twice; we only want to remove the original // if it's different from the working. Otherwise, we remove two claimant counts. This causes // an issue if we only updated the flowfile attributes. - removeContent(record.getCurrentClaim()); + decrementClaimCount(record.getCurrentClaim()); } abortedRecords.add(record); } else { @@ -994,7 +994,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } - private void removeContent(final ContentClaim claim) { + private void decrementClaimCount(final ContentClaim claim) { if (claim == null) { return; } @@ -1704,7 +1704,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE record.markForDelete(); expiredRecords.add(record); expiredReporter.expire(flowFile, "Expiration Threshold = " + connection.getFlowFileQueue().getFlowFileExpiration()); - removeContent(flowFile.getContentClaim()); + decrementClaimCount(flowFile.getContentClaim()); final long flowFileLife = System.currentTimeMillis() - flowFile.getEntryDate(); final Object terminator = connectable instanceof ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable; @@ -2073,9 +2073,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE originalByteWrittenCount = outStream.getBytesWritten(); // wrap our OutputStreams so that the processor cannot close it - try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream)) { + try (final OutputStream disableOnClose = new DisableOnCloseOutputStream(outStream); + final OutputStream flowFileAccessOutStream = new FlowFileAccessOutputStream(disableOnClose, source)) { recursionSet.add(source); - writer.process(disableOnClose); + writer.process(flowFileAccessOutStream); } finally { recursionSet.remove(source); } @@ -2085,15 +2086,37 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE newSize = outStream.getBytesWritten(); } catch (final ContentNotFoundException nfe) { resetWriteClaims(); // need to reset write claim before we can remove the claim - destroyContent(newClaim); + + // If the content claim changed, then we should destroy the new one. We do this + // because the new content claim will never get set as the 'working claim' for the FlowFile + // record since we will throw an Exception. As a result, we need to ensure that we have + // appropriately decremented the claimant count and can destroy the content if it is no + // longer in use. However, it is critical that we do this ONLY if the content claim has + // changed. Otherwise, the FlowFile already has a reference to this Content Claim and + // whenever the FlowFile is removed, the claim count will be decremented; if we decremented + // it here also, we would be decrementing the claimant count twice! + if (newClaim != oldClaim) { + destroyContent(newClaim); + } + handleContentNotFound(nfe, record); } catch (final IOException ioe) { resetWriteClaims(); // need to reset write claim before we can remove the claim - destroyContent(newClaim); - throw new FlowFileAccessException("Exception in callback: " + ioe.toString(), ioe); + + // See above explanation for why this is done only if newClaim != oldClaim + if (newClaim != oldClaim) { + destroyContent(newClaim); + } + + throw new ProcessException("IOException thrown from " + connectableDescription + ": " + ioe.toString(), ioe); } catch (final Throwable t) { resetWriteClaims(); // need to reset write claim before we can remove the claim - destroyContent(newClaim); + + // See above explanation for why this is done only if newClaim != oldClaim + if (newClaim != oldClaim) { + destroyContent(newClaim); + } + throw t; } finally { if (outStream != null) { @@ -2102,6 +2125,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE } } + // If the record already has a working claim, and this is the first time that we are appending to the FlowFile, + // destroy the current working claim because it is a temporary claim that + // is no longer going to be used, as we are about to set a new working claim. This would happen, for instance, if + // the FlowFile was written to, via #write() and then append() was called. if (newClaim != oldClaim) { removeTemporaryClaim(record); } http://git-wip-us.apache.org/repos/asf/nifi/blob/7c718310/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java index 6adcc05..2a82aed 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java @@ -495,6 +495,55 @@ public class TestFileSystemRepository { } + @Test + public void testWriteCannotProvideNullOutput() throws IOException { + FileSystemRepository repository = null; + try { + final List<Path> archivedPathsWithOpenStream = Collections.synchronizedList(new ArrayList<Path>()); + + // We are creating our own 'local' repository in this test so shut down the one created in the setup() method + shutdown(); + + repository = new FileSystemRepository() { + @Override + protected boolean archive(Path curPath) throws IOException { + if (getOpenStreamCount() > 0) { + archivedPathsWithOpenStream.add(curPath); + } + + return true; + } + }; + + final StandardResourceClaimManager claimManager = new StandardResourceClaimManager(); + repository.initialize(claimManager); + repository.purge(); + + final ContentClaim claim = repository.create(false); + + assertEquals(1, claimManager.getClaimantCount(claim.getResourceClaim())); + + int claimantCount = claimManager.decrementClaimantCount(claim.getResourceClaim()); + assertEquals(0, claimantCount); + assertTrue(archivedPathsWithOpenStream.isEmpty()); + + OutputStream out = repository.write(claim); + out.close(); + repository.decrementClaimantCount(claim); + + ContentClaim claim2 = repository.create(false); + assertEquals(claim.getResourceClaim(), claim2.getResourceClaim()); + out = repository.write(claim2); + + final boolean archived = repository.archive(claim.getResourceClaim()); + assertFalse(archived); + } finally { + if (repository != null) { + repository.shutdown(); + } + } + } + /** * We have encountered a situation where the File System Repo is moving files to archive and then eventually * aging them off while there is still an open file handle. This test is meant to replicate the conditions under http://git-wip-us.apache.org/repos/asf/nifi/blob/7c718310/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java index 0e39bde..2a401c6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java @@ -66,6 +66,7 @@ import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.processor.FlowFileFilter; import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.FlowFileAccessException; import org.apache.nifi.processor.exception.MissingFlowFileException; @@ -146,6 +147,8 @@ public class TestStandardProcessSession { final FlowFileSwapManager swapManager = Mockito.mock(FlowFileSwapManager.class); flowFileQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, null); + final StandardFlowFileQueue actualQueue = new StandardFlowFileQueue("1", connection, flowFileRepo, provenanceRepo, null, processScheduler, swapManager, null, 10000, null); + flowFileQueue = Mockito.spy(actualQueue); when(connection.getFlowFileQueue()).thenReturn(flowFileQueue); Mockito.doAnswer(new Answer<Object>() { @@ -204,6 +207,87 @@ public class TestStandardProcessSession { } @Test + public void testAppendToChildThrowsIOExceptionThenRemove() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.create(original); + child = session.append(child, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write("hello".getBytes()); + } + }); + + // Force an IOException. This will decrement out claim count for the resource claim. + try { + child = session.append(child, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + throw new IOException(); + } + }); + Assert.fail("append() callback threw IOException but it was not wrapped in ProcessException"); + } catch (final ProcessException pe) { + // expected + } + + session.remove(child); + session.transfer(original); + session.commit(); + + final int numClaims = contentRepo.getExistingClaims().size(); + assertEquals(0, numClaims); + } + + @Test + public void testWriteForChildThrowsIOExceptionThenRemove() throws IOException { + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .id(1000L) + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .build(); + flowFileQueue.put(flowFileRecord); + FlowFile original = session.get(); + assertNotNull(original); + + FlowFile child = session.create(original); + // Force an IOException. This will decrement out claim count for the resource claim. + try { + child = session.write(child, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write("hello".getBytes()); + } + }); + + child = session.write(child, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + throw new IOException(); + } + }); + Assert.fail("write() callback threw IOException but it was not wrapped in ProcessException"); + } catch (final ProcessException pe) { + // expected + } + + session.remove(child); + session.transfer(original); + session.commit(); + + final int numClaims = contentRepo.getExistingClaims().size(); + assertEquals(0, numClaims); + } + + + @Test public void testModifyContentThenRollback() throws IOException { assertEquals(0, contentRepo.getExistingClaims().size()); @@ -804,6 +888,57 @@ public class TestStandardProcessSession { } @Test + public void testAppendDoesNotDecrementContentClaimIfNotNeeded() { + FlowFile flowFile = session.create(); + + session.append(flowFile, new OutputStreamCallback() { + @Override + public void process(OutputStream out) throws IOException { + out.write("hello".getBytes()); + } + }); + + final Set<ContentClaim> existingClaims = contentRepo.getExistingClaims(); + assertEquals(1, existingClaims.size()); + final ContentClaim claim = existingClaims.iterator().next(); + + final int countAfterAppend = contentRepo.getClaimantCount(claim); + assertEquals(1, countAfterAppend); + } + + + @Test + @SuppressWarnings("unchecked") + public void testExpireDecrementsClaimsOnce() throws IOException { + final ContentClaim contentClaim = contentRepo.create(false); + + final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder() + .addAttribute("uuid", "12345678-1234-1234-1234-123456789012") + .entryDate(System.currentTimeMillis()) + .contentClaim(contentClaim) + .build(); + + Mockito.doAnswer(new Answer<List<FlowFileRecord>>() { + int iterations = 0; + + @Override + public List<FlowFileRecord> answer(InvocationOnMock invocation) throws Throwable { + if (iterations++ == 0) { + final Set<FlowFileRecord> expired = invocation.getArgumentAt(1, Set.class); + expired.add(flowFileRecord); + } + + return null; + } + }).when(flowFileQueue).poll(Mockito.any(FlowFileFilter.class), Mockito.any(Set.class)); + + session.expireFlowFiles(); + session.commit(); // if the content claim count is decremented to less than 0, an exception will be thrown. + + assertEquals(1L, contentRepo.getClaimsRemoved()); + } + + @Test public void testManyFilesOpened() throws IOException { StandardProcessSession[] standardProcessSessions = new StandardProcessSession[100000];
