exceptionfactory commented on code in PR #11034:
URL: https://github.com/apache/nifi/pull/11034#discussion_r3097568049
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##########
@@ -786,6 +910,38 @@ public long loadFlowFiles(final QueueProvider
queueProvider) throws IOException
// If recoveredRecords has been populated it need to be nulled out now
because it is no longer useful and can be garbage collected.
recoveredRecords = null;
+ // Mark truncation candidates only when truncation is enabled and
there are no recovered
+ // swap files. Swapped-out FlowFiles are excluded from
wal.recoverRecords(), so their
+ // content claim references are not reflected in
truncationEligibleClaimReferenceCounts.
+ // Marking candidates here could lead to premature truncation of
claims still referenced
+ // by swapped FlowFiles.
+ if (truncationEnabled && recoveredSwapLocations.isEmpty()) {
+ final Set<ContentClaim> markedCandidates = new HashSet<>();
+ for (final StandardContentClaim eligible :
truncationEligibleClaims) {
+ final ContentClaim latestForResource =
latestContentClaimByResourceClaim.get(eligible.getResourceClaim());
+ if (Objects.equals(eligible, latestForResource)) {
+ eligible.setTruncationCandidate(true);
+ markedCandidates.add(eligible);
+ }
+ }
+
+ // During recovery, multiple FlowFiles sharing the same logical
ContentClaim may hold
+ // different object instances. The loop above only marks the
instance that was added to
+ // truncationEligibleClaims. Propagate the flag to all other
matching instances so that
+ // truncation works regardless of which FlowFile is deleted last.
+ if (!markedCandidates.isEmpty()) {
+ for (final SerializedRepositoryRecord record : recordList) {
+ final ContentClaim claim = record.getContentClaim();
+ if (claim instanceof final StandardContentClaim
standardContentClaim && !standardContentClaim.isTruncationCandidate() &&
markedCandidates.contains(claim)) {
+ standardContentClaim.setTruncationCandidate(true);
+ }
+ }
+ }
+ } else {
+ logger.info("Skipping truncation candidate marking because {} swap
files were recovered; " +
+ "content claims referenced by swapped FlowFiles are not
counted", recoveredSwapLocations.size());
Review Comment:
This message seems rather verbose, given the string concatenation, can it be
shortened?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1048,128 @@ public void purge() {
resourceClaimManager.purge();
}
+ private class TruncateClaims implements Runnable {
+
+ @Override
+ public void run() {
+ final Map<String, Boolean> truncationActivationCache = new
HashMap<>();
+
+ // Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+ for (final String container : containerNames) {
+ if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ final List<ContentClaim> toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+ if (toTruncate.isEmpty()) {
+ continue;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ // Drain any Truncation Claims from the Resource Claim Manager.
+ // If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+ // This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+ // but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+ // Loop to drain the entire queue in a single invocation rather
than waiting for the next scheduled run. Because the default
+ // interval is 1 minute, waiting for the next run could delay
truncation on a disk that is already under pressure and increases
+ // the risk of having too many claims that the queue overflows (in
which case we would lose some optimization).
+ while (true) {
+ final List<ContentClaim> toTruncate = new ArrayList<>();
+ resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
+ if (toTruncate.isEmpty()) {
+ return;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ private void truncateClaims(final List<ContentClaim> toTruncate, final
Map<String, Boolean> truncationActivationCache) {
+ final Map<String, List<ContentClaim>> claimsSkipped = new
HashMap<>();
+
+ for (final ContentClaim claim : toTruncate) {
+ final String container =
claim.getResourceClaim().getContainer();
+ if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation", claim, container);
+ claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
+ continue;
+ }
+
+ if (claim.isTruncationCandidate()) {
+ final int truncationReferenceCount =
resourceClaimManager.getTruncationReferenceCount(claim);
+ if (truncationReferenceCount > 0) {
+ LOG.debug("Skipping truncation of {} because
truncation reference count is {}", claim, truncationReferenceCount);
+ continue;
+ }
+
+ truncate(claim);
+ }
+ }
+
+ claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+ }
+
+ private boolean isTruncationActiveForContainer(final String container,
final Map<String, Boolean> activationCache) {
+ // If not archiving data, we consider truncation always active.
+ if (!archiveData) {
+ return true;
+ }
+
+ final Boolean cachedValue = activationCache.get(container);
+ if (cachedValue != null) {
+ return cachedValue;
+ }
+
+ if (!isArchiveClearedOnLastRun(container)) {
+ LOG.debug("Truncation is not active for container {} because
the archive was not cleared on the last run", container);
+ activationCache.put(container, false);
+ return false;
+ }
+
+ final long usableSpace;
+ try {
+ usableSpace = getContainerUsableSpace(container);
+ } catch (final IOException ioe) {
+ LOG.warn("Failed to determine usable space for container {}.
Will not truncate claims for this container", container, ioe);
Review Comment:
```suggestion
LOG.warn("Failed to determine usable space for container
[{}] Will not truncate claims for this container", container, ioe);
```
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1882,19 +2045,23 @@ public synchronized void close() throws IOException {
// Mark the claim as no longer being able to be written to
resourceClaimManager.freeze(scc.getResourceClaim());
+ final boolean largeClaim = scc.getLength() >
Math.min(1_000_000, maxAppendableClaimLength);
Review Comment:
It looks like the claim length value being checked can be computed once with
`Math.min()` on instantiation, instead of checking on every method invocation
for the lesser of the configured value and 1 MB.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,751 @@ public String changePartitionName(String swapLocation,
String newPartitionName)
return swapLocation;
}
}
+
+ //
=========================================================================
+ // Truncation Feature: Helpers
+ //
=========================================================================
+
+ /**
+ * Creates a mock queue + connection + queueProvider wired together,
suitable for runtime truncation tests.
+ * Returns [claimManager, queueProvider, queue].
+ */
+ private record RuntimeRepoContext(StandardResourceClaimManager
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+ }
+
+ private RuntimeRepoContext createRuntimeRepoContext() {
+ final StandardResourceClaimManager claimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider queueProvider = new TestQueueProvider();
+ final Connection connection = Mockito.mock(Connection.class);
+ when(connection.getIdentifier()).thenReturn("1234");
+
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+ when(queue.getIdentifier()).thenReturn("1234");
+ when(connection.getFlowFileQueue()).thenReturn(queue);
+ queueProvider.addConnection(connection);
+ return new RuntimeRepoContext(claimManager, queueProvider, queue);
+ }
+
+ private StandardContentClaim createClaim(final ResourceClaim rc, final
long offset, final long length, final boolean truncationCandidate) {
+ final StandardContentClaim claim = new StandardContentClaim(rc,
offset);
+ claim.setLength(length);
+ if (truncationCandidate) {
+ claim.setTruncationCandidate(true);
+ }
+ return claim;
+ }
+
+ private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository
repo, final FlowFileQueue queue,
+ final ContentClaim claim) throws
IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", UUID.randomUUID().toString())
+ .contentClaim(claim)
+ .build();
+
+ final StandardRepositoryRecord createRecord = new
StandardRepositoryRecord(queue);
+ createRecord.setWorking(flowFile, false);
+ createRecord.setDestination(queue);
+ repo.updateRepository(List.of(createRecord));
+
+ final StandardRepositoryRecord deleteRecord = new
StandardRepositoryRecord(queue, flowFile);
+ deleteRecord.markForDelete();
+ repo.updateRepository(List.of(deleteRecord));
+ }
+
+ /**
+ * Writes FlowFiles (one per claim) to a new repo, closes it, then
recovers into a fresh repo
+ * and returns the recovered FlowFileRecords.
+ */
+ private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims)
throws IOException {
+ final ResourceClaimManager writeClaimManager = new
StandardResourceClaimManager();
+ final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+ final Connection writeConnection = Mockito.mock(Connection.class);
+ when(writeConnection.getIdentifier()).thenReturn("1234");
+
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+ final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+ final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234",
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+ when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+ writeQueueProvider.addConnection(writeConnection);
+
+ try (final WriteAheadFlowFileRepository repo = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo.initialize(writeClaimManager);
+ repo.loadFlowFiles(writeQueueProvider);
+
+ final List<RepositoryRecord> records = new ArrayList<>();
+ for (int i = 0; i < claims.length; i++) {
+ final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+ .id(i + 1L)
+ .addAttribute("uuid", "11111111-1111-1111-1111-" +
String.format("%012d", i + 1))
+ .contentClaim(claims[i])
+ .build();
+ final StandardRepositoryRecord rec = new
StandardRepositoryRecord(writeQueue);
+ rec.setWorking(ff, false);
+ rec.setDestination(writeQueue);
+ records.add(rec);
+ }
+ repo.updateRepository(records);
+ }
+
+ // Recover
+ final List<FlowFileRecord> recovered = new ArrayList<>();
+ final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+ when(recoveryQueue.getIdentifier()).thenReturn("1234");
+ doAnswer(invocation -> {
+ recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+ return null;
+ }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+ final Connection recoveryConnection = Mockito.mock(Connection.class);
+ when(recoveryConnection.getIdentifier()).thenReturn("1234");
+ when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+ final TestQueueProvider recoveryQueueProvider = new
TestQueueProvider();
+ recoveryQueueProvider.addConnection(recoveryConnection);
+
+ try (final WriteAheadFlowFileRepository repo2 = new
WriteAheadFlowFileRepository(niFiProperties)) {
+ repo2.initialize(new StandardResourceClaimManager());
+ repo2.loadFlowFiles(recoveryQueueProvider);
+ }
+
+ return recovered;
+ }
+
+ private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord>
recovered, final long offset) {
+ return recovered.stream()
+ .filter(ff -> ff.getContentClaim() != null &&
ff.getContentClaim().getOffset() == offset)
+ .findFirst()
+ .orElse(null);
+ }
+
+ //
=========================================================================
+ // Truncation Feature: Runtime Tests
+ //
=========================================================================
+
+ @Test
+ public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue()
throws IOException {
+ final RuntimeRepoContext context = createRuntimeRepoContext();
+ final ResourceClaim resourceClaim =
context.claimManager().newResourceClaim("container", "section", "1", false,
false);
+ context.claimManager().incrementClaimantCount(resourceClaim);
+ context.claimManager().incrementClaimantCount(resourceClaim); // count
= 2 so that after delete decrement it stays > 0 (not destructable)
+ final StandardContentClaim contentClaim = createClaim(resourceClaim,
1024L, 5_000_000L, true);
Review Comment:
All of these new test methods have numerous repeated values that should be
declared as constants and reused.
##########
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java:
##########
@@ -29,6 +29,7 @@ public final class StandardContentClaim implements
ContentClaim, Comparable<Cont
private final ResourceClaim resourceClaim;
private final long offset;
private volatile long length;
+ private volatile boolean truncationCandidate = false;
Review Comment:
The `false` initialized value should be unnecessary
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ // Create a small claim at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
+ // is recycled back to the writable queue.
+ final ContentClaim smallClaim = repository.create(false);
+ final byte[] smallData = new byte[100];
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(smallData);
+ }
+ assertFalse(smallClaim.isTruncationCandidate());
+
+ // Now create a large claim on potentially the same ResourceClaim,
writing more than maxAppendableClaimLength
+ // to freeze the ResourceClaim. Because smallClaim was small and
recycled, largeClaim will be at a non-zero
+ // offset on the same ResourceClaim.
+ final ContentClaim largeClaim = repository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(largeData);
+ }
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Negative case: create a standalone large claim at offset 0 (fresh
ResourceClaim)
+ // To ensure a fresh ResourceClaim, write large data to all writable
claims to exhaust them,
+ // then create a new claim that starts at offset 0.
+ // The simplest approach: create claims until we get one at offset 0.
+ ContentClaim offsetZeroClaim = null;
+ for (int i = 0; i < 20; i++) {
+ final ContentClaim candidate = repository.create(false);
+ if (candidate instanceof StandardContentClaim standardContentClaim
&& standardContentClaim.getOffset() == 0) {
+ // Write large data that exceeds maxAppendableClaimLength
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ offsetZeroClaim = candidate;
+ break;
+ } else {
+ // Write large data to exhaust this claim's ResourceClaim
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ }
+ }
+
+ assertNotNull(offsetZeroClaim);
+ assertFalse(offsetZeroClaim.isTruncationCandidate());
+ }
+
+ @Test
+ public void testIncrementClaimantCountPreservesTruncationCandidate()
throws IOException {
+ final ContentClaim smallClaim = repository.create(false);
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = repository.create(false);
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Simulating a clone by incrementing claimant count should not clear
the truncation
+ // candidate flag. ContentClaim reference counting in the FlowFile
repository prevents
+ // premature truncation of shared claims; the flag must remain so that
truncation can
+ // proceed once all references are removed.
+ repository.incrementClaimaintCount(largeClaim);
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateClaimReducesFileSizeAndPreservesEarlierData()
throws IOException, InterruptedException {
+ // We need to create our own repository that overrides
getContainerUsableSpace to simulate disk pressure
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0; // Extreme disk pressure
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Hello World - small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Both claims should share the same resource claim
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ // Get the file path
+ final Path filePath = getPath(localRepository, smallClaim);
+ assertNotNull(filePath);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Decrement claimant count for the large claim to 0 (small claim
still holds a reference)
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+ // Mark the large claim as truncatable
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for the TruncateClaims background task to truncate the
file. Poll the file size until it shrinks.
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still fully readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ final byte[] readData = readFully(in, smallData.length);
+ assertArrayEquals(smallData, readData);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateNotActiveWhenDiskNotPressured() throws
IOException, InterruptedException {
+ // Create repository with ample disk space
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return Long.MAX_VALUE; // Plenty of space
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ Thread.sleep(3000L);
+ assertEquals(originalSize, Files.size(filePath));
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(90)
+ public void testTruncateClaimDeferredThenExecutedWhenPressureStarts()
throws IOException, InterruptedException {
+ // Create a repository where disk pressure can be toggled
Review Comment:
Similar to above comment, this seems misplaced.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ // Create a small claim at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
+ // is recycled back to the writable queue.
+ final ContentClaim smallClaim = repository.create(false);
+ final byte[] smallData = new byte[100];
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(smallData);
+ }
+ assertFalse(smallClaim.isTruncationCandidate());
+
+ // Now create a large claim on potentially the same ResourceClaim,
writing more than maxAppendableClaimLength
+ // to freeze the ResourceClaim. Because smallClaim was small and
recycled, largeClaim will be at a non-zero
+ // offset on the same ResourceClaim.
+ final ContentClaim largeClaim = repository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(largeData);
+ }
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Negative case: create a standalone large claim at offset 0 (fresh
ResourceClaim)
+ // To ensure a fresh ResourceClaim, write large data to all writable
claims to exhaust them,
+ // then create a new claim that starts at offset 0.
+ // The simplest approach: create claims until we get one at offset 0.
+ ContentClaim offsetZeroClaim = null;
+ for (int i = 0; i < 20; i++) {
+ final ContentClaim candidate = repository.create(false);
+ if (candidate instanceof StandardContentClaim standardContentClaim
&& standardContentClaim.getOffset() == 0) {
+ // Write large data that exceeds maxAppendableClaimLength
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ offsetZeroClaim = candidate;
+ break;
+ } else {
+ // Write large data to exhaust this claim's ResourceClaim
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ }
+ }
+
+ assertNotNull(offsetZeroClaim);
+ assertFalse(offsetZeroClaim.isTruncationCandidate());
+ }
+
+ @Test
+ public void testIncrementClaimantCountPreservesTruncationCandidate()
throws IOException {
+ final ContentClaim smallClaim = repository.create(false);
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = repository.create(false);
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Simulating a clone by incrementing claimant count should not clear
the truncation
+ // candidate flag. ContentClaim reference counting in the FlowFile
repository prevents
+ // premature truncation of shared claims; the flag must remain so that
truncation can
+ // proceed once all references are removed.
+ repository.incrementClaimaintCount(largeClaim);
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateClaimReducesFileSizeAndPreservesEarlierData()
throws IOException, InterruptedException {
+ // We need to create our own repository that overrides
getContainerUsableSpace to simulate disk pressure
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0; // Extreme disk pressure
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Hello World - small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Both claims should share the same resource claim
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ // Get the file path
+ final Path filePath = getPath(localRepository, smallClaim);
+ assertNotNull(filePath);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Decrement claimant count for the large claim to 0 (small claim
still holds a reference)
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+ // Mark the large claim as truncatable
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for the TruncateClaims background task to truncate the
file. Poll the file size until it shrinks.
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still fully readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ final byte[] readData = readFully(in, smallData.length);
+ assertArrayEquals(smallData, readData);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateNotActiveWhenDiskNotPressured() throws
IOException, InterruptedException {
+ // Create repository with ample disk space
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return Long.MAX_VALUE; // Plenty of space
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ Thread.sleep(3000L);
Review Comment:
Is there a reason for this 3 second sleep? Any way to make it shorter, or
less time-dependent?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ // Create a small claim at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
+ // is recycled back to the writable queue.
+ final ContentClaim smallClaim = repository.create(false);
+ final byte[] smallData = new byte[100];
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(smallData);
+ }
+ assertFalse(smallClaim.isTruncationCandidate());
+
+ // Now create a large claim on potentially the same ResourceClaim,
writing more than maxAppendableClaimLength
+ // to freeze the ResourceClaim. Because smallClaim was small and
recycled, largeClaim will be at a non-zero
+ // offset on the same ResourceClaim.
+ final ContentClaim largeClaim = repository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(largeData);
+ }
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Negative case: create a standalone large claim at offset 0 (fresh
ResourceClaim)
+ // To ensure a fresh ResourceClaim, write large data to all writable
claims to exhaust them,
+ // then create a new claim that starts at offset 0.
+ // The simplest approach: create claims until we get one at offset 0.
+ ContentClaim offsetZeroClaim = null;
+ for (int i = 0; i < 20; i++) {
+ final ContentClaim candidate = repository.create(false);
+ if (candidate instanceof StandardContentClaim standardContentClaim
&& standardContentClaim.getOffset() == 0) {
+ // Write large data that exceeds maxAppendableClaimLength
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ offsetZeroClaim = candidate;
+ break;
+ } else {
+ // Write large data to exhaust this claim's ResourceClaim
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ }
+ }
+
+ assertNotNull(offsetZeroClaim);
+ assertFalse(offsetZeroClaim.isTruncationCandidate());
+ }
+
+ @Test
+ public void testIncrementClaimantCountPreservesTruncationCandidate()
throws IOException {
+ final ContentClaim smallClaim = repository.create(false);
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = repository.create(false);
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Simulating a clone by incrementing claimant count should not clear
the truncation
+ // candidate flag. ContentClaim reference counting in the FlowFile
repository prevents
+ // premature truncation of shared claims; the flag must remain so that
truncation can
+ // proceed once all references are removed.
+ repository.incrementClaimaintCount(largeClaim);
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateClaimReducesFileSizeAndPreservesEarlierData()
throws IOException, InterruptedException {
+ // We need to create our own repository that overrides
getContainerUsableSpace to simulate disk pressure
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0; // Extreme disk pressure
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Hello World - small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Both claims should share the same resource claim
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ // Get the file path
+ final Path filePath = getPath(localRepository, smallClaim);
+ assertNotNull(filePath);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Decrement claimant count for the large claim to 0 (small claim
still holds a reference)
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+ // Mark the large claim as truncatable
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for the TruncateClaims background task to truncate the
file. Poll the file size until it shrinks.
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still fully readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ final byte[] readData = readFully(in, smallData.length);
+ assertArrayEquals(smallData, readData);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateNotActiveWhenDiskNotPressured() throws
IOException, InterruptedException {
+ // Create repository with ample disk space
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return Long.MAX_VALUE; // Plenty of space
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ Thread.sleep(3000L);
+ assertEquals(originalSize, Files.size(filePath));
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(90)
+ public void testTruncateClaimDeferredThenExecutedWhenPressureStarts()
throws IOException, InterruptedException {
+ // Create a repository where disk pressure can be toggled
+ shutdown();
+
+ final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE);
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return usableSpace.get();
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for at least one run of the background task with NO
pressure.
+ // File should NOT be truncated.
+ Thread.sleep(3_000);
+ assertEquals(originalSize, Files.size(filePath));
+
+ // Now turn on disk pressure
+ usableSpace.set(0);
+
+ // Wait for the next background task run to truncate the file
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ assertNotNull(in);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
private byte[] readFully(final InputStream inStream, final int size)
throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
+ final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(size);
int len;
final byte[] buffer = new byte[size];
while ((len = inStream.read(buffer)) >= 0) {
- baos.write(buffer, 0, len);
+ outputStream.write(buffer, 0, len);
+ }
+
+ return outputStream.toByteArray();
+ }
+
+ @Test
+ @Timeout(60)
+ public void
testTruncateClaimsSkipsClaimWithPositiveTruncationReferenceCount() throws
IOException, InterruptedException {
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0;
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Simulate a second FlowFile (e.g., a clone) still referencing
this claim
+ localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ // markTruncatable should have been rejected because the ref count
is positive.
+ // But even if it had been enqueued (e.g. a future code path
change), TruncateClaims
+ // must also skip it. Directly enqueue to simulate that scenario.
+ final List<ContentClaim> forceEnqueued = new ArrayList<>();
+ localClaimManager.drainTruncatableClaims(forceEnqueued, 100);
+ assertTrue(forceEnqueued.isEmpty(), "markTruncatable should have
rejected the claim due to positive ref count");
+
+ // Force-enqueue by decrementing to zero, marking truncatable,
then re-incrementing
+ // before the background task can run. This simulates the Layer 3
scenario.
+ localClaimManager.decrementTruncationReferenceCount(largeClaim);
+ localClaimManager.markTruncatable(largeClaim);
+ localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+ // Wait long enough for the TruncateClaims background task to have
executed at least once
+ Thread.sleep(3_000);
Review Comment:
If 3 seconds is the desired amount across tests, it should be declared once
and reused.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1882,19 +2045,23 @@ public synchronized void close() throws IOException {
// Mark the claim as no longer being able to be written to
resourceClaimManager.freeze(scc.getResourceClaim());
+ final boolean largeClaim = scc.getLength() >
Math.min(1_000_000, maxAppendableClaimLength);
+ final boolean nonStartClaim = scc.getOffset() > 0;
+ scc.setTruncationCandidate(truncationEnabled && largeClaim &&
nonStartClaim);
Review Comment:
for readability and debugging, it seems useful to define `boolean
truncationCandidate` instead of the inline logical and expression.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1048,128 @@ public void purge() {
resourceClaimManager.purge();
}
+ private class TruncateClaims implements Runnable {
+
+ @Override
+ public void run() {
+ final Map<String, Boolean> truncationActivationCache = new
HashMap<>();
+
+ // Go through any known truncation claims and truncate them now if
truncation is enabled for their container.
+ for (final String container : containerNames) {
+ if (isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ final List<ContentClaim> toTruncate =
truncationClaimManager.removeTruncationClaims(container);
+ if (toTruncate.isEmpty()) {
+ continue;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ // Drain any Truncation Claims from the Resource Claim Manager.
+ // If able, truncate those claims. Otherwise, save those claims in
the Truncation Claim Manager to be truncated on the next run.
+ // This prevents us from having a case where we could truncate a
big claim but we don't because we're not yet running out of disk space,
+ // but then we later start to run out of disk space and lost the
opportunity to truncate that big claim.
+ // Loop to drain the entire queue in a single invocation rather
than waiting for the next scheduled run. Because the default
+ // interval is 1 minute, waiting for the next run could delay
truncation on a disk that is already under pressure and increases
+ // the risk of having too many claims that the queue overflows (in
which case we would lose some optimization).
+ while (true) {
+ final List<ContentClaim> toTruncate = new ArrayList<>();
+ resourceClaimManager.drainTruncatableClaims(toTruncate,
10_000);
+ if (toTruncate.isEmpty()) {
+ return;
+ }
+
+ truncateClaims(toTruncate, truncationActivationCache);
+ }
+ }
+
+ private void truncateClaims(final List<ContentClaim> toTruncate, final
Map<String, Boolean> truncationActivationCache) {
+ final Map<String, List<ContentClaim>> claimsSkipped = new
HashMap<>();
+
+ for (final ContentClaim claim : toTruncate) {
+ final String container =
claim.getResourceClaim().getContainer();
+ if (!isTruncationActiveForContainer(container,
truncationActivationCache)) {
+ LOG.debug("Will not truncate {} because truncation is not
active for container {}; will save for later truncation", claim, container);
+ claimsSkipped.computeIfAbsent(container, key -> new
ArrayList<>()).add(claim);
+ continue;
+ }
+
+ if (claim.isTruncationCandidate()) {
+ final int truncationReferenceCount =
resourceClaimManager.getTruncationReferenceCount(claim);
+ if (truncationReferenceCount > 0) {
+ LOG.debug("Skipping truncation of {} because
truncation reference count is {}", claim, truncationReferenceCount);
+ continue;
+ }
+
+ truncate(claim);
+ }
+ }
+
+ claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+ }
+
+ private boolean isTruncationActiveForContainer(final String container,
final Map<String, Boolean> activationCache) {
+ // If not archiving data, we consider truncation always active.
+ if (!archiveData) {
+ return true;
Review Comment:
This method has quite a number of `return` statements, what do you think
about refactoring to a single return, or breaking it up in some other way?
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ // Create a small claim at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
+ // is recycled back to the writable queue.
+ final ContentClaim smallClaim = repository.create(false);
+ final byte[] smallData = new byte[100];
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(smallData);
+ }
+ assertFalse(smallClaim.isTruncationCandidate());
+
+ // Now create a large claim on potentially the same ResourceClaim,
writing more than maxAppendableClaimLength
+ // to freeze the ResourceClaim. Because smallClaim was small and
recycled, largeClaim will be at a non-zero
+ // offset on the same ResourceClaim.
+ final ContentClaim largeClaim = repository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(largeData);
+ }
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Negative case: create a standalone large claim at offset 0 (fresh
ResourceClaim)
+ // To ensure a fresh ResourceClaim, write large data to all writable
claims to exhaust them,
+ // then create a new claim that starts at offset 0.
+ // The simplest approach: create claims until we get one at offset 0.
+ ContentClaim offsetZeroClaim = null;
+ for (int i = 0; i < 20; i++) {
+ final ContentClaim candidate = repository.create(false);
+ if (candidate instanceof StandardContentClaim standardContentClaim
&& standardContentClaim.getOffset() == 0) {
+ // Write large data that exceeds maxAppendableClaimLength
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ offsetZeroClaim = candidate;
+ break;
+ } else {
+ // Write large data to exhaust this claim's ResourceClaim
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ }
+ }
+
+ assertNotNull(offsetZeroClaim);
+ assertFalse(offsetZeroClaim.isTruncationCandidate());
+ }
+
+ @Test
+ public void testIncrementClaimantCountPreservesTruncationCandidate()
throws IOException {
+ final ContentClaim smallClaim = repository.create(false);
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = repository.create(false);
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Simulating a clone by incrementing claimant count should not clear
the truncation
+ // candidate flag. ContentClaim reference counting in the FlowFile
repository prevents
+ // premature truncation of shared claims; the flag must remain so that
truncation can
+ // proceed once all references are removed.
+ repository.incrementClaimaintCount(largeClaim);
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateClaimReducesFileSizeAndPreservesEarlierData()
throws IOException, InterruptedException {
+ // We need to create our own repository that overrides
getContainerUsableSpace to simulate disk pressure
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0; // Extreme disk pressure
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Hello World - small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Both claims should share the same resource claim
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ // Get the file path
+ final Path filePath = getPath(localRepository, smallClaim);
+ assertNotNull(filePath);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Decrement claimant count for the large claim to 0 (small claim
still holds a reference)
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+ // Mark the large claim as truncatable
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for the TruncateClaims background task to truncate the
file. Poll the file size until it shrinks.
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still fully readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ final byte[] readData = readFully(in, smallData.length);
+ assertArrayEquals(smallData, readData);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateNotActiveWhenDiskNotPressured() throws
IOException, InterruptedException {
+ // Create repository with ample disk space
Review Comment:
Is this comment intended for the repository? It doesn't fit above the
`shutdown()` call.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ // Create a small claim at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
+ // is recycled back to the writable queue.
+ final ContentClaim smallClaim = repository.create(false);
+ final byte[] smallData = new byte[100];
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(smallData);
+ }
+ assertFalse(smallClaim.isTruncationCandidate());
+
+ // Now create a large claim on potentially the same ResourceClaim,
writing more than maxAppendableClaimLength
+ // to freeze the ResourceClaim. Because smallClaim was small and
recycled, largeClaim will be at a non-zero
+ // offset on the same ResourceClaim.
+ final ContentClaim largeClaim = repository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(largeData);
+ }
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Negative case: create a standalone large claim at offset 0 (fresh
ResourceClaim)
+ // To ensure a fresh ResourceClaim, write large data to all writable
claims to exhaust them,
+ // then create a new claim that starts at offset 0.
+ // The simplest approach: create claims until we get one at offset 0.
+ ContentClaim offsetZeroClaim = null;
+ for (int i = 0; i < 20; i++) {
+ final ContentClaim candidate = repository.create(false);
+ if (candidate instanceof StandardContentClaim standardContentClaim
&& standardContentClaim.getOffset() == 0) {
+ // Write large data that exceeds maxAppendableClaimLength
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ offsetZeroClaim = candidate;
+ break;
+ } else {
+ // Write large data to exhaust this claim's ResourceClaim
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ }
+ }
+
+ assertNotNull(offsetZeroClaim);
+ assertFalse(offsetZeroClaim.isTruncationCandidate());
+ }
+
+ @Test
+ public void testIncrementClaimantCountPreservesTruncationCandidate()
throws IOException {
+ final ContentClaim smallClaim = repository.create(false);
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = repository.create(false);
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Simulating a clone by incrementing claimant count should not clear
the truncation
+ // candidate flag. ContentClaim reference counting in the FlowFile
repository prevents
+ // premature truncation of shared claims; the flag must remain so that
truncation can
+ // proceed once all references are removed.
+ repository.incrementClaimaintCount(largeClaim);
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateClaimReducesFileSizeAndPreservesEarlierData()
throws IOException, InterruptedException {
+ // We need to create our own repository that overrides
getContainerUsableSpace to simulate disk pressure
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0; // Extreme disk pressure
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Hello World - small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Both claims should share the same resource claim
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ // Get the file path
+ final Path filePath = getPath(localRepository, smallClaim);
+ assertNotNull(filePath);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Decrement claimant count for the large claim to 0 (small claim
still holds a reference)
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+ // Mark the large claim as truncatable
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for the TruncateClaims background task to truncate the
file. Poll the file size until it shrinks.
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still fully readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ final byte[] readData = readFully(in, smallData.length);
+ assertArrayEquals(smallData, readData);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateNotActiveWhenDiskNotPressured() throws
IOException, InterruptedException {
+ // Create repository with ample disk space
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return Long.MAX_VALUE; // Plenty of space
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ Thread.sleep(3000L);
+ assertEquals(originalSize, Files.size(filePath));
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(90)
+ public void testTruncateClaimDeferredThenExecutedWhenPressureStarts()
throws IOException, InterruptedException {
+ // Create a repository where disk pressure can be toggled
+ shutdown();
+
+ final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE);
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return usableSpace.get();
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for at least one run of the background task with NO
pressure.
+ // File should NOT be truncated.
+ Thread.sleep(3_000);
+ assertEquals(originalSize, Files.size(filePath));
+
+ // Now turn on disk pressure
+ usableSpace.set(0);
+
+ // Wait for the next background task run to truncate the file
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ assertNotNull(in);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
private byte[] readFully(final InputStream inStream, final int size)
throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
+ final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(size);
int len;
final byte[] buffer = new byte[size];
while ((len = inStream.read(buffer)) >= 0) {
- baos.write(buffer, 0, len);
+ outputStream.write(buffer, 0, len);
+ }
+
+ return outputStream.toByteArray();
+ }
+
+ @Test
+ @Timeout(60)
+ public void
testTruncateClaimsSkipsClaimWithPositiveTruncationReferenceCount() throws
IOException, InterruptedException {
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0;
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Simulate a second FlowFile (e.g., a clone) still referencing
this claim
+ localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ // markTruncatable should have been rejected because the ref count
is positive.
+ // But even if it had been enqueued (e.g. a future code path
change), TruncateClaims
+ // must also skip it. Directly enqueue to simulate that scenario.
+ final List<ContentClaim> forceEnqueued = new ArrayList<>();
+ localClaimManager.drainTruncatableClaims(forceEnqueued, 100);
+ assertTrue(forceEnqueued.isEmpty(), "markTruncatable should have
rejected the claim due to positive ref count");
+
+ // Force-enqueue by decrementing to zero, marking truncatable,
then re-incrementing
+ // before the background task can run. This simulates the Layer 3
scenario.
+ localClaimManager.decrementTruncationReferenceCount(largeClaim);
+ localClaimManager.markTruncatable(largeClaim);
+ localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+ // Wait long enough for the TruncateClaims background task to have
executed at least once
+ Thread.sleep(3_000);
+
+ // File should NOT have been truncated because TruncateClaims
checks ref count > 0
Review Comment:
The comment and assert message are duplicative.
##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
}
}
+ @Test
+ public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim()
throws IOException {
+ // Create a small claim at offset 0. Write less data than
maxAppendableClaimLength so the ResourceClaim
+ // is recycled back to the writable queue.
+ final ContentClaim smallClaim = repository.create(false);
+ final byte[] smallData = new byte[100];
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(smallData);
+ }
+ assertFalse(smallClaim.isTruncationCandidate());
+
+ // Now create a large claim on potentially the same ResourceClaim,
writing more than maxAppendableClaimLength
+ // to freeze the ResourceClaim. Because smallClaim was small and
recycled, largeClaim will be at a non-zero
+ // offset on the same ResourceClaim.
+ final ContentClaim largeClaim = repository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(largeData);
+ }
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Negative case: create a standalone large claim at offset 0 (fresh
ResourceClaim)
+ // To ensure a fresh ResourceClaim, write large data to all writable
claims to exhaust them,
+ // then create a new claim that starts at offset 0.
+ // The simplest approach: create claims until we get one at offset 0.
+ ContentClaim offsetZeroClaim = null;
+ for (int i = 0; i < 20; i++) {
+ final ContentClaim candidate = repository.create(false);
+ if (candidate instanceof StandardContentClaim standardContentClaim
&& standardContentClaim.getOffset() == 0) {
+ // Write large data that exceeds maxAppendableClaimLength
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ offsetZeroClaim = candidate;
+ break;
+ } else {
+ // Write large data to exhaust this claim's ResourceClaim
+ try (final OutputStream out = repository.write(candidate)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+ }
+ }
+
+ assertNotNull(offsetZeroClaim);
+ assertFalse(offsetZeroClaim.isTruncationCandidate());
+ }
+
+ @Test
+ public void testIncrementClaimantCountPreservesTruncationCandidate()
throws IOException {
+ final ContentClaim smallClaim = repository.create(false);
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = repository.create(false);
+ try (final OutputStream out = repository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 1024]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Simulating a clone by incrementing claimant count should not clear
the truncation
+ // candidate flag. ContentClaim reference counting in the FlowFile
repository prevents
+ // premature truncation of shared claims; the flag must remain so that
truncation can
+ // proceed once all references are removed.
+ repository.incrementClaimaintCount(largeClaim);
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateClaimReducesFileSizeAndPreservesEarlierData()
throws IOException, InterruptedException {
+ // We need to create our own repository that overrides
getContainerUsableSpace to simulate disk pressure
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0; // Extreme disk pressure
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Hello World - small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ // Both claims should share the same resource claim
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ // Get the file path
+ final Path filePath = getPath(localRepository, smallClaim);
+ assertNotNull(filePath);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Decrement claimant count for the large claim to 0 (small claim
still holds a reference)
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+ // Mark the large claim as truncatable
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for the TruncateClaims background task to truncate the
file. Poll the file size until it shrinks.
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still fully readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ final byte[] readData = readFully(in, smallData.length);
+ assertArrayEquals(smallData, readData);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void testTruncateNotActiveWhenDiskNotPressured() throws
IOException, InterruptedException {
+ // Create repository with ample disk space
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return Long.MAX_VALUE; // Plenty of space
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ Thread.sleep(3000L);
+ assertEquals(originalSize, Files.size(filePath));
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ @Timeout(90)
+ public void testTruncateClaimDeferredThenExecutedWhenPressureStarts()
throws IOException, InterruptedException {
+ // Create a repository where disk pressure can be toggled
+ shutdown();
+
+ final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE);
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return usableSpace.get();
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ // Create a small claim then a large claim on the same
ResourceClaim
+ final ContentClaim smallClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(new byte[100]);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(new byte[(int) maxClaimLength + 4096]);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Wait for at least one run of the background task with NO
pressure.
+ // File should NOT be truncated.
+ Thread.sleep(3_000);
+ assertEquals(originalSize, Files.size(filePath));
+
+ // Now turn on disk pressure
+ usableSpace.set(0);
+
+ // Wait for the next background task run to truncate the file
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ // Verify the small claim's data is still readable
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ assertNotNull(in);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
private byte[] readFully(final InputStream inStream, final int size)
throws IOException {
- final ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
+ final ByteArrayOutputStream outputStream = new
ByteArrayOutputStream(size);
int len;
final byte[] buffer = new byte[size];
while ((len = inStream.read(buffer)) >= 0) {
- baos.write(buffer, 0, len);
+ outputStream.write(buffer, 0, len);
+ }
+
+ return outputStream.toByteArray();
+ }
+
+ @Test
+ @Timeout(60)
+ public void
testTruncateClaimsSkipsClaimWithPositiveTruncationReferenceCount() throws
IOException, InterruptedException {
+ shutdown();
+
+ final FileSystemRepository localRepository = new
FileSystemRepository(nifiProperties) {
+ @Override
+ public long getContainerUsableSpace(final String containerName) {
+ return 0;
+ }
+
+ @Override
+ protected boolean isArchiveClearedOnLastRun(final String
containerName) {
+ return true;
+ }
+ };
+
+ try {
+ final StandardResourceClaimManager localClaimManager = new
StandardResourceClaimManager();
+ localRepository.initialize(new
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+ localRepository.purge();
+
+ final ContentClaim smallClaim = localRepository.create(false);
+ final byte[] smallData = "Small claim
data".getBytes(StandardCharsets.UTF_8);
+ try (final OutputStream out = localRepository.write(smallClaim)) {
+ out.write(smallData);
+ }
+
+ final ContentClaim largeClaim = localRepository.create(false);
+ final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+ new Random().nextBytes(largeData);
+ try (final OutputStream out = localRepository.write(largeClaim)) {
+ out.write(largeData);
+ }
+
+ assertTrue(largeClaim.isTruncationCandidate());
+ assertEquals(smallClaim.getResourceClaim(),
largeClaim.getResourceClaim());
+
+ final Path filePath = getPath(localRepository, smallClaim);
+ final long originalSize = Files.size(filePath);
+ assertTrue(originalSize > maxClaimLength);
+
+ // Simulate a second FlowFile (e.g., a clone) still referencing
this claim
+ localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+ localClaimManager.markTruncatable(largeClaim);
+
+ // markTruncatable should have been rejected because the ref count
is positive.
+ // But even if it had been enqueued (e.g. a future code path
change), TruncateClaims
+ // must also skip it. Directly enqueue to simulate that scenario.
+ final List<ContentClaim> forceEnqueued = new ArrayList<>();
+ localClaimManager.drainTruncatableClaims(forceEnqueued, 100);
+ assertTrue(forceEnqueued.isEmpty(), "markTruncatable should have
rejected the claim due to positive ref count");
+
+ // Force-enqueue by decrementing to zero, marking truncatable,
then re-incrementing
+ // before the background task can run. This simulates the Layer 3
scenario.
+ localClaimManager.decrementTruncationReferenceCount(largeClaim);
+ localClaimManager.markTruncatable(largeClaim);
+ localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+ // Wait long enough for the TruncateClaims background task to have
executed at least once
+ Thread.sleep(3_000);
+
+ // File should NOT have been truncated because TruncateClaims
checks ref count > 0
+ assertEquals(originalSize, Files.size(filePath),
+ "File should not be truncated because TruncateClaims
should skip claims with positive truncation reference count");
+
+ // Clean up: decrement the ref count and re-mark truncatable
+ localClaimManager.decrementTruncationReferenceCount(largeClaim);
+ localClaimManager.markTruncatable(largeClaim);
+
+ // Now TruncateClaims should proceed
+ final long expectedTruncatedSize = largeClaim.getOffset();
+ while (Files.size(filePath) != expectedTruncatedSize) {
+ Thread.sleep(100L);
+ }
+
+ try (final InputStream in = localRepository.read(smallClaim)) {
+ final byte[] readData = readFully(in, smallData.length);
+ assertArrayEquals(smallData, readData);
+ }
+ } finally {
+ localRepository.shutdown();
+ }
+ }
+
+ @Test
+ public void testTruncationDisabledPreventsSettingTruncationCandidate()
throws IOException {
+ recreateRepositoryWithPropertyOverrides(Map.of(
+ NiFiProperties.REPOSITORY_CONTENT_PREFIX + "default",
rootFile.toString(),
+ NiFiProperties.CONTENT_CLAIM_TRUNCATION_ENABLED, "false"));
+
+ final ContentClaim smallClaim = repository.create(false);
+ try (final OutputStream out = repository.write(smallClaim)) {
+ out.write(new byte[100]);
Review Comment:
This `100` value is used in multiple places.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]