exceptionfactory commented on code in PR #11034:
URL: https://github.com/apache/nifi/pull/11034#discussion_r3097568049


##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java:
##########
@@ -786,6 +910,38 @@ public long loadFlowFiles(final QueueProvider 
queueProvider) throws IOException
         // If recoveredRecords has been populated it need to be nulled out now 
because it is no longer useful and can be garbage collected.
         recoveredRecords = null;
 
+        // Mark truncation candidates only when truncation is enabled and 
there are no recovered
+        // swap files. Swapped-out FlowFiles are excluded from 
wal.recoverRecords(), so their
+        // content claim references are not reflected in 
truncationEligibleClaimReferenceCounts.
+        // Marking candidates here could lead to premature truncation of 
claims still referenced
+        // by swapped FlowFiles.
+        if (truncationEnabled && recoveredSwapLocations.isEmpty()) {
+            final Set<ContentClaim> markedCandidates = new HashSet<>();
+            for (final StandardContentClaim eligible : 
truncationEligibleClaims) {
+                final ContentClaim latestForResource = 
latestContentClaimByResourceClaim.get(eligible.getResourceClaim());
+                if (Objects.equals(eligible, latestForResource)) {
+                    eligible.setTruncationCandidate(true);
+                    markedCandidates.add(eligible);
+                }
+            }
+
+            // During recovery, multiple FlowFiles sharing the same logical 
ContentClaim may hold
+            // different object instances. The loop above only marks the 
instance that was added to
+            // truncationEligibleClaims. Propagate the flag to all other 
matching instances so that
+            // truncation works regardless of which FlowFile is deleted last.
+            if (!markedCandidates.isEmpty()) {
+                for (final SerializedRepositoryRecord record : recordList) {
+                    final ContentClaim claim = record.getContentClaim();
+                    if (claim instanceof final StandardContentClaim 
standardContentClaim && !standardContentClaim.isTruncationCandidate() && 
markedCandidates.contains(claim)) {
+                        standardContentClaim.setTruncationCandidate(true);
+                    }
+                }
+            }
+        } else {
+            logger.info("Skipping truncation candidate marking because {} swap 
files were recovered; " +
+                "content claims referenced by swapped FlowFiles are not 
counted", recoveredSwapLocations.size());

Review Comment:
   This message seems rather verbose, given the string concatenation, can it be 
shortened?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1048,128 @@ public void purge() {
         resourceClaimManager.purge();
     }
 
+    private class TruncateClaims implements Runnable {
+
+        @Override
+        public void run() {
+            final Map<String, Boolean> truncationActivationCache = new 
HashMap<>();
+
+            // Go through any known truncation claims and truncate them now if 
truncation is enabled for their container.
+            for (final String container : containerNames) {
+                if (isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    final List<ContentClaim> toTruncate = 
truncationClaimManager.removeTruncationClaims(container);
+                    if (toTruncate.isEmpty()) {
+                        continue;
+                    }
+
+                    truncateClaims(toTruncate, truncationActivationCache);
+                }
+            }
+
+            // Drain any Truncation Claims from the Resource Claim Manager.
+            // If able, truncate those claims. Otherwise, save those claims in 
the Truncation Claim Manager to be truncated on the next run.
+            // This prevents us from having a case where we could truncate a 
big claim but we don't because we're not yet running out of disk space,
+            // but then we later start to run out of disk space and lost the 
opportunity to truncate that big claim.
+            // Loop to drain the entire queue in a single invocation rather 
than waiting for the next scheduled run. Because the default
+            // interval is 1 minute, waiting for the next run could delay 
truncation on a disk that is already under pressure and increases
+            // the risk of having too many claims that the queue overflows (in 
which case we would lose some optimization).
+            while (true) {
+                final List<ContentClaim> toTruncate = new ArrayList<>();
+                resourceClaimManager.drainTruncatableClaims(toTruncate, 
10_000);
+                if (toTruncate.isEmpty()) {
+                    return;
+                }
+
+                truncateClaims(toTruncate, truncationActivationCache);
+            }
+        }
+
+        private void truncateClaims(final List<ContentClaim> toTruncate, final 
Map<String, Boolean> truncationActivationCache) {
+            final Map<String, List<ContentClaim>> claimsSkipped = new 
HashMap<>();
+
+            for (final ContentClaim claim : toTruncate) {
+                final String container = 
claim.getResourceClaim().getContainer();
+                if (!isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    LOG.debug("Will not truncate {} because truncation is not 
active for container {}; will save for later truncation", claim, container);
+                    claimsSkipped.computeIfAbsent(container, key -> new 
ArrayList<>()).add(claim);
+                    continue;
+                }
+
+                if (claim.isTruncationCandidate()) {
+                    final int truncationReferenceCount = 
resourceClaimManager.getTruncationReferenceCount(claim);
+                    if (truncationReferenceCount > 0) {
+                        LOG.debug("Skipping truncation of {} because 
truncation reference count is {}", claim, truncationReferenceCount);
+                        continue;
+                    }
+
+                    truncate(claim);
+                }
+            }
+
+            claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+        }
+
+        private boolean isTruncationActiveForContainer(final String container, 
final Map<String, Boolean> activationCache) {
+            // If not archiving data, we consider truncation always active.
+            if (!archiveData) {
+                return true;
+            }
+
+            final Boolean cachedValue = activationCache.get(container);
+            if (cachedValue != null) {
+                return cachedValue;
+            }
+
+            if (!isArchiveClearedOnLastRun(container)) {
+                LOG.debug("Truncation is not active for container {} because 
the archive was not cleared on the last run", container);
+                activationCache.put(container, false);
+                return false;
+            }
+
+            final long usableSpace;
+            try {
+                usableSpace = getContainerUsableSpace(container);
+            } catch (final IOException ioe) {
+                LOG.warn("Failed to determine usable space for container {}. 
Will not truncate claims for this container", container, ioe);

Review Comment:
   ```suggestion
                   LOG.warn("Failed to determine usable space for container 
[{}] Will not truncate claims for this container", container, ioe);
   ```



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1882,19 +2045,23 @@ public synchronized void close() throws IOException {
                 // Mark the claim as no longer being able to be written to
                 resourceClaimManager.freeze(scc.getResourceClaim());
 
+                final boolean largeClaim = scc.getLength() > 
Math.min(1_000_000, maxAppendableClaimLength);

Review Comment:
   It looks like the claim length value being checked can be computed once with 
`Math.min()` on instantiation, instead of checking on every method invocation 
for the lesser of the configured value and 1 MB.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java:
##########
@@ -810,4 +810,751 @@ public String changePartitionName(String swapLocation, 
String newPartitionName)
             return swapLocation;
         }
     }
+
+    // 
=========================================================================
+    // Truncation Feature: Helpers
+    // 
=========================================================================
+
+    /**
+     * Creates a mock queue + connection + queueProvider wired together, 
suitable for runtime truncation tests.
+     * Returns [claimManager, queueProvider, queue].
+     */
+    private record RuntimeRepoContext(StandardResourceClaimManager 
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+    }
+
+    private RuntimeRepoContext createRuntimeRepoContext() {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+        
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+        queueProvider.addConnection(connection);
+        return new RuntimeRepoContext(claimManager, queueProvider, queue);
+    }
+
+    private StandardContentClaim createClaim(final ResourceClaim rc, final 
long offset, final long length, final boolean truncationCandidate) {
+        final StandardContentClaim claim = new StandardContentClaim(rc, 
offset);
+        claim.setLength(length);
+        if (truncationCandidate) {
+            claim.setTruncationCandidate(true);
+        }
+        return claim;
+    }
+
+    private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository 
repo, final FlowFileQueue queue,
+                                         final ContentClaim claim) throws 
IOException {
+        final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(claim)
+                .build();
+
+        final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(queue);
+        createRecord.setWorking(flowFile, false);
+        createRecord.setDestination(queue);
+        repo.updateRepository(List.of(createRecord));
+
+        final StandardRepositoryRecord deleteRecord = new 
StandardRepositoryRecord(queue, flowFile);
+        deleteRecord.markForDelete();
+        repo.updateRepository(List.of(deleteRecord));
+    }
+
+    /**
+     * Writes FlowFiles (one per claim) to a new repo, closes it, then 
recovers into a fresh repo
+     * and returns the recovered FlowFileRecords.
+     */
+    private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims) 
throws IOException {
+        final ResourceClaimManager writeClaimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+        final Connection writeConnection = Mockito.mock(Connection.class);
+        when(writeConnection.getIdentifier()).thenReturn("1234");
+        
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+        final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234", 
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+        when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+        writeQueueProvider.addConnection(writeConnection);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(writeClaimManager);
+            repo.loadFlowFiles(writeQueueProvider);
+
+            final List<RepositoryRecord> records = new ArrayList<>();
+            for (int i = 0; i < claims.length; i++) {
+                final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+                        .id(i + 1L)
+                        .addAttribute("uuid", "11111111-1111-1111-1111-" + 
String.format("%012d", i + 1))
+                        .contentClaim(claims[i])
+                        .build();
+                final StandardRepositoryRecord rec = new 
StandardRepositoryRecord(writeQueue);
+                rec.setWorking(ff, false);
+                rec.setDestination(writeQueue);
+                records.add(rec);
+            }
+            repo.updateRepository(records);
+        }
+
+        // Recover
+        final List<FlowFileRecord> recovered = new ArrayList<>();
+        final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+        when(recoveryQueue.getIdentifier()).thenReturn("1234");
+        doAnswer(invocation -> {
+            recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+            return null;
+        }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+        final Connection recoveryConnection = Mockito.mock(Connection.class);
+        when(recoveryConnection.getIdentifier()).thenReturn("1234");
+        when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+        final TestQueueProvider recoveryQueueProvider = new 
TestQueueProvider();
+        recoveryQueueProvider.addConnection(recoveryConnection);
+
+        try (final WriteAheadFlowFileRepository repo2 = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo2.initialize(new StandardResourceClaimManager());
+            repo2.loadFlowFiles(recoveryQueueProvider);
+        }
+
+        return recovered;
+    }
+
+    private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord> 
recovered, final long offset) {
+        return recovered.stream()
+                .filter(ff -> ff.getContentClaim() != null && 
ff.getContentClaim().getOffset() == offset)
+                .findFirst()
+                .orElse(null);
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Runtime Tests
+    // 
=========================================================================
+
+    @Test
+    public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue() 
throws IOException {
+        final RuntimeRepoContext context = createRuntimeRepoContext();
+        final ResourceClaim resourceClaim = 
context.claimManager().newResourceClaim("container", "section", "1", false, 
false);
+        context.claimManager().incrementClaimantCount(resourceClaim);
+        context.claimManager().incrementClaimantCount(resourceClaim); // count 
= 2 so that after delete decrement it stays > 0 (not destructable)
+        final StandardContentClaim contentClaim = createClaim(resourceClaim, 
1024L, 5_000_000L, true);

Review Comment:
   All of these new test methods have numerous repeated values that should be 
declared as constants and reused.



##########
nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java:
##########
@@ -29,6 +29,7 @@ public final class StandardContentClaim implements 
ContentClaim, Comparable<Cont
     private final ResourceClaim resourceClaim;
     private final long offset;
     private volatile long length;
+    private volatile boolean truncationCandidate = false;

Review Comment:
   The `false` initialized value should be unnecessary



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        // Create a small claim at offset 0. Write less data than 
maxAppendableClaimLength so the ResourceClaim
+        // is recycled back to the writable queue.
+        final ContentClaim smallClaim = repository.create(false);
+        final byte[] smallData = new byte[100];
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(smallData);
+        }
+        assertFalse(smallClaim.isTruncationCandidate());
+
+        // Now create a large claim on potentially the same ResourceClaim, 
writing more than maxAppendableClaimLength
+        // to freeze the ResourceClaim. Because smallClaim was small and 
recycled, largeClaim will be at a non-zero
+        // offset on the same ResourceClaim.
+        final ContentClaim largeClaim = repository.create(false);
+        final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(largeData);
+        }
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Negative case: create a standalone large claim at offset 0 (fresh 
ResourceClaim)
+        // To ensure a fresh ResourceClaim, write large data to all writable 
claims to exhaust them,
+        // then create a new claim that starts at offset 0.
+        // The simplest approach: create claims until we get one at offset 0.
+        ContentClaim offsetZeroClaim = null;
+        for (int i = 0; i < 20; i++) {
+            final ContentClaim candidate = repository.create(false);
+            if (candidate instanceof StandardContentClaim standardContentClaim 
&& standardContentClaim.getOffset() == 0) {
+                // Write large data that exceeds maxAppendableClaimLength
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+                offsetZeroClaim = candidate;
+                break;
+            } else {
+                // Write large data to exhaust this claim's ResourceClaim
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+            }
+        }
+
+        assertNotNull(offsetZeroClaim);
+        assertFalse(offsetZeroClaim.isTruncationCandidate());
+    }
+
+    @Test
+    public void testIncrementClaimantCountPreservesTruncationCandidate() 
throws IOException {
+        final ContentClaim smallClaim = repository.create(false);
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(new byte[100]);
+        }
+
+        final ContentClaim largeClaim = repository.create(false);
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(new byte[(int) maxClaimLength + 1024]);
+        }
+
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Simulating a clone by incrementing claimant count should not clear 
the truncation
+        // candidate flag. ContentClaim reference counting in the FlowFile 
repository prevents
+        // premature truncation of shared claims; the flag must remain so that 
truncation can
+        // proceed once all references are removed.
+        repository.incrementClaimaintCount(largeClaim);
+
+        assertTrue(largeClaim.isTruncationCandidate());
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateClaimReducesFileSizeAndPreservesEarlierData() 
throws IOException, InterruptedException {
+        // We need to create our own repository that overrides 
getContainerUsableSpace to simulate disk pressure
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0; // Extreme disk pressure
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Hello World - small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            // Both claims should share the same resource claim
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            // Get the file path
+            final Path filePath = getPath(localRepository, smallClaim);
+            assertNotNull(filePath);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Decrement claimant count for the large claim to 0 (small claim 
still holds a reference)
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+            // Mark the large claim as truncatable
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for the TruncateClaims background task to truncate the 
file. Poll the file size until it shrinks.
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still fully readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                final byte[] readData = readFully(in, smallData.length);
+                assertArrayEquals(smallData, readData);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateNotActiveWhenDiskNotPressured() throws 
IOException, InterruptedException {
+        // Create repository with ample disk space
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return Long.MAX_VALUE; // Plenty of space
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            Thread.sleep(3000L);
+            assertEquals(originalSize, Files.size(filePath));
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testTruncateClaimDeferredThenExecutedWhenPressureStarts() 
throws IOException, InterruptedException {
+        // Create a repository where disk pressure can be toggled

Review Comment:
   Similar to above comment, this seems misplaced.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        // Create a small claim at offset 0. Write less data than 
maxAppendableClaimLength so the ResourceClaim
+        // is recycled back to the writable queue.
+        final ContentClaim smallClaim = repository.create(false);
+        final byte[] smallData = new byte[100];
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(smallData);
+        }
+        assertFalse(smallClaim.isTruncationCandidate());
+
+        // Now create a large claim on potentially the same ResourceClaim, 
writing more than maxAppendableClaimLength
+        // to freeze the ResourceClaim. Because smallClaim was small and 
recycled, largeClaim will be at a non-zero
+        // offset on the same ResourceClaim.
+        final ContentClaim largeClaim = repository.create(false);
+        final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(largeData);
+        }
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Negative case: create a standalone large claim at offset 0 (fresh 
ResourceClaim)
+        // To ensure a fresh ResourceClaim, write large data to all writable 
claims to exhaust them,
+        // then create a new claim that starts at offset 0.
+        // The simplest approach: create claims until we get one at offset 0.
+        ContentClaim offsetZeroClaim = null;
+        for (int i = 0; i < 20; i++) {
+            final ContentClaim candidate = repository.create(false);
+            if (candidate instanceof StandardContentClaim standardContentClaim 
&& standardContentClaim.getOffset() == 0) {
+                // Write large data that exceeds maxAppendableClaimLength
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+                offsetZeroClaim = candidate;
+                break;
+            } else {
+                // Write large data to exhaust this claim's ResourceClaim
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+            }
+        }
+
+        assertNotNull(offsetZeroClaim);
+        assertFalse(offsetZeroClaim.isTruncationCandidate());
+    }
+
+    @Test
+    public void testIncrementClaimantCountPreservesTruncationCandidate() 
throws IOException {
+        final ContentClaim smallClaim = repository.create(false);
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(new byte[100]);
+        }
+
+        final ContentClaim largeClaim = repository.create(false);
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(new byte[(int) maxClaimLength + 1024]);
+        }
+
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Simulating a clone by incrementing claimant count should not clear 
the truncation
+        // candidate flag. ContentClaim reference counting in the FlowFile 
repository prevents
+        // premature truncation of shared claims; the flag must remain so that 
truncation can
+        // proceed once all references are removed.
+        repository.incrementClaimaintCount(largeClaim);
+
+        assertTrue(largeClaim.isTruncationCandidate());
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateClaimReducesFileSizeAndPreservesEarlierData() 
throws IOException, InterruptedException {
+        // We need to create our own repository that overrides 
getContainerUsableSpace to simulate disk pressure
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0; // Extreme disk pressure
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Hello World - small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            // Both claims should share the same resource claim
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            // Get the file path
+            final Path filePath = getPath(localRepository, smallClaim);
+            assertNotNull(filePath);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Decrement claimant count for the large claim to 0 (small claim 
still holds a reference)
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+            // Mark the large claim as truncatable
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for the TruncateClaims background task to truncate the 
file. Poll the file size until it shrinks.
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still fully readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                final byte[] readData = readFully(in, smallData.length);
+                assertArrayEquals(smallData, readData);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateNotActiveWhenDiskNotPressured() throws 
IOException, InterruptedException {
+        // Create repository with ample disk space
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return Long.MAX_VALUE; // Plenty of space
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            Thread.sleep(3000L);

Review Comment:
   Is there a reason for this 3 second sleep? Any way to make it shorter, or 
less time-dependent?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        // Create a small claim at offset 0. Write less data than 
maxAppendableClaimLength so the ResourceClaim
+        // is recycled back to the writable queue.
+        final ContentClaim smallClaim = repository.create(false);
+        final byte[] smallData = new byte[100];
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(smallData);
+        }
+        assertFalse(smallClaim.isTruncationCandidate());
+
+        // Now create a large claim on potentially the same ResourceClaim, 
writing more than maxAppendableClaimLength
+        // to freeze the ResourceClaim. Because smallClaim was small and 
recycled, largeClaim will be at a non-zero
+        // offset on the same ResourceClaim.
+        final ContentClaim largeClaim = repository.create(false);
+        final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(largeData);
+        }
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Negative case: create a standalone large claim at offset 0 (fresh 
ResourceClaim)
+        // To ensure a fresh ResourceClaim, write large data to all writable 
claims to exhaust them,
+        // then create a new claim that starts at offset 0.
+        // The simplest approach: create claims until we get one at offset 0.
+        ContentClaim offsetZeroClaim = null;
+        for (int i = 0; i < 20; i++) {
+            final ContentClaim candidate = repository.create(false);
+            if (candidate instanceof StandardContentClaim standardContentClaim 
&& standardContentClaim.getOffset() == 0) {
+                // Write large data that exceeds maxAppendableClaimLength
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+                offsetZeroClaim = candidate;
+                break;
+            } else {
+                // Write large data to exhaust this claim's ResourceClaim
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+            }
+        }
+
+        assertNotNull(offsetZeroClaim);
+        assertFalse(offsetZeroClaim.isTruncationCandidate());
+    }
+
+    @Test
+    public void testIncrementClaimantCountPreservesTruncationCandidate() 
throws IOException {
+        final ContentClaim smallClaim = repository.create(false);
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(new byte[100]);
+        }
+
+        final ContentClaim largeClaim = repository.create(false);
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(new byte[(int) maxClaimLength + 1024]);
+        }
+
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Simulating a clone by incrementing claimant count should not clear 
the truncation
+        // candidate flag. ContentClaim reference counting in the FlowFile 
repository prevents
+        // premature truncation of shared claims; the flag must remain so that 
truncation can
+        // proceed once all references are removed.
+        repository.incrementClaimaintCount(largeClaim);
+
+        assertTrue(largeClaim.isTruncationCandidate());
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateClaimReducesFileSizeAndPreservesEarlierData() 
throws IOException, InterruptedException {
+        // We need to create our own repository that overrides 
getContainerUsableSpace to simulate disk pressure
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0; // Extreme disk pressure
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Hello World - small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            // Both claims should share the same resource claim
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            // Get the file path
+            final Path filePath = getPath(localRepository, smallClaim);
+            assertNotNull(filePath);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Decrement claimant count for the large claim to 0 (small claim 
still holds a reference)
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+            // Mark the large claim as truncatable
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for the TruncateClaims background task to truncate the 
file. Poll the file size until it shrinks.
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still fully readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                final byte[] readData = readFully(in, smallData.length);
+                assertArrayEquals(smallData, readData);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateNotActiveWhenDiskNotPressured() throws 
IOException, InterruptedException {
+        // Create repository with ample disk space
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return Long.MAX_VALUE; // Plenty of space
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            Thread.sleep(3000L);
+            assertEquals(originalSize, Files.size(filePath));
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testTruncateClaimDeferredThenExecutedWhenPressureStarts() 
throws IOException, InterruptedException {
+        // Create a repository where disk pressure can be toggled
+        shutdown();
+
+        final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE);
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return usableSpace.get();
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for at least one run of the background task with NO 
pressure.
+            // File should NOT be truncated.
+            Thread.sleep(3_000);
+            assertEquals(originalSize, Files.size(filePath));
+
+            // Now turn on disk pressure
+            usableSpace.set(0);
+
+            // Wait for the next background task run to truncate the file
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                assertNotNull(in);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
     private byte[] readFully(final InputStream inStream, final int size) 
throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
+        final ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(size);
         int len;
         final byte[] buffer = new byte[size];
         while ((len = inStream.read(buffer)) >= 0) {
-            baos.write(buffer, 0, len);
+            outputStream.write(buffer, 0, len);
+        }
+
+        return outputStream.toByteArray();
+    }
+
+    @Test
+    @Timeout(60)
+    public void 
testTruncateClaimsSkipsClaimWithPositiveTruncationReferenceCount() throws 
IOException, InterruptedException {
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0;
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Simulate a second FlowFile (e.g., a clone) still referencing 
this claim
+            localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            // markTruncatable should have been rejected because the ref count 
is positive.
+            // But even if it had been enqueued (e.g. a future code path 
change), TruncateClaims
+            // must also skip it. Directly enqueue to simulate that scenario.
+            final List<ContentClaim> forceEnqueued = new ArrayList<>();
+            localClaimManager.drainTruncatableClaims(forceEnqueued, 100);
+            assertTrue(forceEnqueued.isEmpty(), "markTruncatable should have 
rejected the claim due to positive ref count");
+
+            // Force-enqueue by decrementing to zero, marking truncatable, 
then re-incrementing
+            // before the background task can run. This simulates the Layer 3 
scenario.
+            localClaimManager.decrementTruncationReferenceCount(largeClaim);
+            localClaimManager.markTruncatable(largeClaim);
+            localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+            // Wait long enough for the TruncateClaims background task to have 
executed at least once
+            Thread.sleep(3_000);

Review Comment:
   If 3 seconds is the desired amount across tests, it should be declared once 
and reused.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1882,19 +2045,23 @@ public synchronized void close() throws IOException {
                 // Mark the claim as no longer being able to be written to
                 resourceClaimManager.freeze(scc.getResourceClaim());
 
+                final boolean largeClaim = scc.getLength() > 
Math.min(1_000_000, maxAppendableClaimLength);
+                final boolean nonStartClaim = scc.getOffset() > 0;
+                scc.setTruncationCandidate(truncationEnabled && largeClaim && 
nonStartClaim);

Review Comment:
   for readability and debugging, it seems useful to define `boolean 
truncationCandidate` instead of the inline logical and expression.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java:
##########
@@ -1032,6 +1048,128 @@ public void purge() {
         resourceClaimManager.purge();
     }
 
+    private class TruncateClaims implements Runnable {
+
+        @Override
+        public void run() {
+            final Map<String, Boolean> truncationActivationCache = new 
HashMap<>();
+
+            // Go through any known truncation claims and truncate them now if 
truncation is enabled for their container.
+            for (final String container : containerNames) {
+                if (isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    final List<ContentClaim> toTruncate = 
truncationClaimManager.removeTruncationClaims(container);
+                    if (toTruncate.isEmpty()) {
+                        continue;
+                    }
+
+                    truncateClaims(toTruncate, truncationActivationCache);
+                }
+            }
+
+            // Drain any Truncation Claims from the Resource Claim Manager.
+            // If able, truncate those claims. Otherwise, save those claims in 
the Truncation Claim Manager to be truncated on the next run.
+            // This prevents us from having a case where we could truncate a 
big claim but we don't because we're not yet running out of disk space,
+            // but then we later start to run out of disk space and lost the 
opportunity to truncate that big claim.
+            // Loop to drain the entire queue in a single invocation rather 
than waiting for the next scheduled run. Because the default
+            // interval is 1 minute, waiting for the next run could delay 
truncation on a disk that is already under pressure and increases
+            // the risk of having too many claims that the queue overflows (in 
which case we would lose some optimization).
+            while (true) {
+                final List<ContentClaim> toTruncate = new ArrayList<>();
+                resourceClaimManager.drainTruncatableClaims(toTruncate, 
10_000);
+                if (toTruncate.isEmpty()) {
+                    return;
+                }
+
+                truncateClaims(toTruncate, truncationActivationCache);
+            }
+        }
+
+        private void truncateClaims(final List<ContentClaim> toTruncate, final 
Map<String, Boolean> truncationActivationCache) {
+            final Map<String, List<ContentClaim>> claimsSkipped = new 
HashMap<>();
+
+            for (final ContentClaim claim : toTruncate) {
+                final String container = 
claim.getResourceClaim().getContainer();
+                if (!isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    LOG.debug("Will not truncate {} because truncation is not 
active for container {}; will save for later truncation", claim, container);
+                    claimsSkipped.computeIfAbsent(container, key -> new 
ArrayList<>()).add(claim);
+                    continue;
+                }
+
+                if (claim.isTruncationCandidate()) {
+                    final int truncationReferenceCount = 
resourceClaimManager.getTruncationReferenceCount(claim);
+                    if (truncationReferenceCount > 0) {
+                        LOG.debug("Skipping truncation of {} because 
truncation reference count is {}", claim, truncationReferenceCount);
+                        continue;
+                    }
+
+                    truncate(claim);
+                }
+            }
+
+            claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+        }
+
+        private boolean isTruncationActiveForContainer(final String container, 
final Map<String, Boolean> activationCache) {
+            // If not archiving data, we consider truncation always active.
+            if (!archiveData) {
+                return true;

Review Comment:
   This method has quite a number of `return` statements, what do you think 
about refactoring to a single return, or breaking it up in some other way?



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        // Create a small claim at offset 0. Write less data than 
maxAppendableClaimLength so the ResourceClaim
+        // is recycled back to the writable queue.
+        final ContentClaim smallClaim = repository.create(false);
+        final byte[] smallData = new byte[100];
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(smallData);
+        }
+        assertFalse(smallClaim.isTruncationCandidate());
+
+        // Now create a large claim on potentially the same ResourceClaim, 
writing more than maxAppendableClaimLength
+        // to freeze the ResourceClaim. Because smallClaim was small and 
recycled, largeClaim will be at a non-zero
+        // offset on the same ResourceClaim.
+        final ContentClaim largeClaim = repository.create(false);
+        final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(largeData);
+        }
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Negative case: create a standalone large claim at offset 0 (fresh 
ResourceClaim)
+        // To ensure a fresh ResourceClaim, write large data to all writable 
claims to exhaust them,
+        // then create a new claim that starts at offset 0.
+        // The simplest approach: create claims until we get one at offset 0.
+        ContentClaim offsetZeroClaim = null;
+        for (int i = 0; i < 20; i++) {
+            final ContentClaim candidate = repository.create(false);
+            if (candidate instanceof StandardContentClaim standardContentClaim 
&& standardContentClaim.getOffset() == 0) {
+                // Write large data that exceeds maxAppendableClaimLength
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+                offsetZeroClaim = candidate;
+                break;
+            } else {
+                // Write large data to exhaust this claim's ResourceClaim
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+            }
+        }
+
+        assertNotNull(offsetZeroClaim);
+        assertFalse(offsetZeroClaim.isTruncationCandidate());
+    }
+
+    @Test
+    public void testIncrementClaimantCountPreservesTruncationCandidate() 
throws IOException {
+        final ContentClaim smallClaim = repository.create(false);
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(new byte[100]);
+        }
+
+        final ContentClaim largeClaim = repository.create(false);
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(new byte[(int) maxClaimLength + 1024]);
+        }
+
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Simulating a clone by incrementing claimant count should not clear 
the truncation
+        // candidate flag. ContentClaim reference counting in the FlowFile 
repository prevents
+        // premature truncation of shared claims; the flag must remain so that 
truncation can
+        // proceed once all references are removed.
+        repository.incrementClaimaintCount(largeClaim);
+
+        assertTrue(largeClaim.isTruncationCandidate());
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateClaimReducesFileSizeAndPreservesEarlierData() 
throws IOException, InterruptedException {
+        // We need to create our own repository that overrides 
getContainerUsableSpace to simulate disk pressure
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0; // Extreme disk pressure
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Hello World - small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            // Both claims should share the same resource claim
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            // Get the file path
+            final Path filePath = getPath(localRepository, smallClaim);
+            assertNotNull(filePath);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Decrement claimant count for the large claim to 0 (small claim 
still holds a reference)
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+            // Mark the large claim as truncatable
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for the TruncateClaims background task to truncate the 
file. Poll the file size until it shrinks.
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still fully readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                final byte[] readData = readFully(in, smallData.length);
+                assertArrayEquals(smallData, readData);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateNotActiveWhenDiskNotPressured() throws 
IOException, InterruptedException {
+        // Create repository with ample disk space

Review Comment:
   Is this comment intended for the repository? It doesn't fit above the 
`shutdown()` call.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        // Create a small claim at offset 0. Write less data than 
maxAppendableClaimLength so the ResourceClaim
+        // is recycled back to the writable queue.
+        final ContentClaim smallClaim = repository.create(false);
+        final byte[] smallData = new byte[100];
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(smallData);
+        }
+        assertFalse(smallClaim.isTruncationCandidate());
+
+        // Now create a large claim on potentially the same ResourceClaim, 
writing more than maxAppendableClaimLength
+        // to freeze the ResourceClaim. Because smallClaim was small and 
recycled, largeClaim will be at a non-zero
+        // offset on the same ResourceClaim.
+        final ContentClaim largeClaim = repository.create(false);
+        final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(largeData);
+        }
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Negative case: create a standalone large claim at offset 0 (fresh 
ResourceClaim)
+        // To ensure a fresh ResourceClaim, write large data to all writable 
claims to exhaust them,
+        // then create a new claim that starts at offset 0.
+        // The simplest approach: create claims until we get one at offset 0.
+        ContentClaim offsetZeroClaim = null;
+        for (int i = 0; i < 20; i++) {
+            final ContentClaim candidate = repository.create(false);
+            if (candidate instanceof StandardContentClaim standardContentClaim 
&& standardContentClaim.getOffset() == 0) {
+                // Write large data that exceeds maxAppendableClaimLength
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+                offsetZeroClaim = candidate;
+                break;
+            } else {
+                // Write large data to exhaust this claim's ResourceClaim
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+            }
+        }
+
+        assertNotNull(offsetZeroClaim);
+        assertFalse(offsetZeroClaim.isTruncationCandidate());
+    }
+
+    @Test
+    public void testIncrementClaimantCountPreservesTruncationCandidate() 
throws IOException {
+        final ContentClaim smallClaim = repository.create(false);
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(new byte[100]);
+        }
+
+        final ContentClaim largeClaim = repository.create(false);
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(new byte[(int) maxClaimLength + 1024]);
+        }
+
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Simulating a clone by incrementing claimant count should not clear 
the truncation
+        // candidate flag. ContentClaim reference counting in the FlowFile 
repository prevents
+        // premature truncation of shared claims; the flag must remain so that 
truncation can
+        // proceed once all references are removed.
+        repository.incrementClaimaintCount(largeClaim);
+
+        assertTrue(largeClaim.isTruncationCandidate());
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateClaimReducesFileSizeAndPreservesEarlierData() 
throws IOException, InterruptedException {
+        // We need to create our own repository that overrides 
getContainerUsableSpace to simulate disk pressure
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0; // Extreme disk pressure
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Hello World - small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            // Both claims should share the same resource claim
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            // Get the file path
+            final Path filePath = getPath(localRepository, smallClaim);
+            assertNotNull(filePath);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Decrement claimant count for the large claim to 0 (small claim 
still holds a reference)
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+            // Mark the large claim as truncatable
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for the TruncateClaims background task to truncate the 
file. Poll the file size until it shrinks.
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still fully readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                final byte[] readData = readFully(in, smallData.length);
+                assertArrayEquals(smallData, readData);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateNotActiveWhenDiskNotPressured() throws 
IOException, InterruptedException {
+        // Create repository with ample disk space
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return Long.MAX_VALUE; // Plenty of space
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            Thread.sleep(3000L);
+            assertEquals(originalSize, Files.size(filePath));
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testTruncateClaimDeferredThenExecutedWhenPressureStarts() 
throws IOException, InterruptedException {
+        // Create a repository where disk pressure can be toggled
+        shutdown();
+
+        final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE);
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return usableSpace.get();
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for at least one run of the background task with NO 
pressure.
+            // File should NOT be truncated.
+            Thread.sleep(3_000);
+            assertEquals(originalSize, Files.size(filePath));
+
+            // Now turn on disk pressure
+            usableSpace.set(0);
+
+            // Wait for the next background task run to truncate the file
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                assertNotNull(in);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
     private byte[] readFully(final InputStream inStream, final int size) 
throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
+        final ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(size);
         int len;
         final byte[] buffer = new byte[size];
         while ((len = inStream.read(buffer)) >= 0) {
-            baos.write(buffer, 0, len);
+            outputStream.write(buffer, 0, len);
+        }
+
+        return outputStream.toByteArray();
+    }
+
+    @Test
+    @Timeout(60)
+    public void 
testTruncateClaimsSkipsClaimWithPositiveTruncationReferenceCount() throws 
IOException, InterruptedException {
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0;
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Simulate a second FlowFile (e.g., a clone) still referencing 
this claim
+            localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            // markTruncatable should have been rejected because the ref count 
is positive.
+            // But even if it had been enqueued (e.g. a future code path 
change), TruncateClaims
+            // must also skip it. Directly enqueue to simulate that scenario.
+            final List<ContentClaim> forceEnqueued = new ArrayList<>();
+            localClaimManager.drainTruncatableClaims(forceEnqueued, 100);
+            assertTrue(forceEnqueued.isEmpty(), "markTruncatable should have 
rejected the claim due to positive ref count");
+
+            // Force-enqueue by decrementing to zero, marking truncatable, 
then re-incrementing
+            // before the background task can run. This simulates the Layer 3 
scenario.
+            localClaimManager.decrementTruncationReferenceCount(largeClaim);
+            localClaimManager.markTruncatable(largeClaim);
+            localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+            // Wait long enough for the TruncateClaims background task to have 
executed at least once
+            Thread.sleep(3_000);
+
+            // File should NOT have been truncated because TruncateClaims 
checks ref count > 0

Review Comment:
   The comment and assert message are duplicative.



##########
nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java:
##########
@@ -897,14 +901,376 @@ protected boolean archive(Path curPath) {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        // Create a small claim at offset 0. Write less data than 
maxAppendableClaimLength so the ResourceClaim
+        // is recycled back to the writable queue.
+        final ContentClaim smallClaim = repository.create(false);
+        final byte[] smallData = new byte[100];
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(smallData);
+        }
+        assertFalse(smallClaim.isTruncationCandidate());
+
+        // Now create a large claim on potentially the same ResourceClaim, 
writing more than maxAppendableClaimLength
+        // to freeze the ResourceClaim. Because smallClaim was small and 
recycled, largeClaim will be at a non-zero
+        // offset on the same ResourceClaim.
+        final ContentClaim largeClaim = repository.create(false);
+        final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(largeData);
+        }
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Negative case: create a standalone large claim at offset 0 (fresh 
ResourceClaim)
+        // To ensure a fresh ResourceClaim, write large data to all writable 
claims to exhaust them,
+        // then create a new claim that starts at offset 0.
+        // The simplest approach: create claims until we get one at offset 0.
+        ContentClaim offsetZeroClaim = null;
+        for (int i = 0; i < 20; i++) {
+            final ContentClaim candidate = repository.create(false);
+            if (candidate instanceof StandardContentClaim standardContentClaim 
&& standardContentClaim.getOffset() == 0) {
+                // Write large data that exceeds maxAppendableClaimLength
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+                offsetZeroClaim = candidate;
+                break;
+            } else {
+                // Write large data to exhaust this claim's ResourceClaim
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+            }
+        }
+
+        assertNotNull(offsetZeroClaim);
+        assertFalse(offsetZeroClaim.isTruncationCandidate());
+    }
+
+    @Test
+    public void testIncrementClaimantCountPreservesTruncationCandidate() 
throws IOException {
+        final ContentClaim smallClaim = repository.create(false);
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(new byte[100]);
+        }
+
+        final ContentClaim largeClaim = repository.create(false);
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(new byte[(int) maxClaimLength + 1024]);
+        }
+
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Simulating a clone by incrementing claimant count should not clear 
the truncation
+        // candidate flag. ContentClaim reference counting in the FlowFile 
repository prevents
+        // premature truncation of shared claims; the flag must remain so that 
truncation can
+        // proceed once all references are removed.
+        repository.incrementClaimaintCount(largeClaim);
+
+        assertTrue(largeClaim.isTruncationCandidate());
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateClaimReducesFileSizeAndPreservesEarlierData() 
throws IOException, InterruptedException {
+        // We need to create our own repository that overrides 
getContainerUsableSpace to simulate disk pressure
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0; // Extreme disk pressure
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Hello World - small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            // Both claims should share the same resource claim
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            // Get the file path
+            final Path filePath = getPath(localRepository, smallClaim);
+            assertNotNull(filePath);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Decrement claimant count for the large claim to 0 (small claim 
still holds a reference)
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+            // Mark the large claim as truncatable
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for the TruncateClaims background task to truncate the 
file. Poll the file size until it shrinks.
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still fully readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                final byte[] readData = readFully(in, smallData.length);
+                assertArrayEquals(smallData, readData);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateNotActiveWhenDiskNotPressured() throws 
IOException, InterruptedException {
+        // Create repository with ample disk space
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return Long.MAX_VALUE; // Plenty of space
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            Thread.sleep(3000L);
+            assertEquals(originalSize, Files.size(filePath));
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testTruncateClaimDeferredThenExecutedWhenPressureStarts() 
throws IOException, InterruptedException {
+        // Create a repository where disk pressure can be toggled
+        shutdown();
+
+        final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE);
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return usableSpace.get();
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for at least one run of the background task with NO 
pressure.
+            // File should NOT be truncated.
+            Thread.sleep(3_000);
+            assertEquals(originalSize, Files.size(filePath));
+
+            // Now turn on disk pressure
+            usableSpace.set(0);
+
+            // Wait for the next background task run to truncate the file
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                assertNotNull(in);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
     private byte[] readFully(final InputStream inStream, final int size) 
throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
+        final ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(size);
         int len;
         final byte[] buffer = new byte[size];
         while ((len = inStream.read(buffer)) >= 0) {
-            baos.write(buffer, 0, len);
+            outputStream.write(buffer, 0, len);
+        }
+
+        return outputStream.toByteArray();
+    }
+
+    @Test
+    @Timeout(60)
+    public void 
testTruncateClaimsSkipsClaimWithPositiveTruncationReferenceCount() throws 
IOException, InterruptedException {
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0;
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Simulate a second FlowFile (e.g., a clone) still referencing 
this claim
+            localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            // markTruncatable should have been rejected because the ref count 
is positive.
+            // But even if it had been enqueued (e.g. a future code path 
change), TruncateClaims
+            // must also skip it. Directly enqueue to simulate that scenario.
+            final List<ContentClaim> forceEnqueued = new ArrayList<>();
+            localClaimManager.drainTruncatableClaims(forceEnqueued, 100);
+            assertTrue(forceEnqueued.isEmpty(), "markTruncatable should have 
rejected the claim due to positive ref count");
+
+            // Force-enqueue by decrementing to zero, marking truncatable, 
then re-incrementing
+            // before the background task can run. This simulates the Layer 3 
scenario.
+            localClaimManager.decrementTruncationReferenceCount(largeClaim);
+            localClaimManager.markTruncatable(largeClaim);
+            localClaimManager.incrementTruncationReferenceCount(largeClaim);
+
+            // Wait long enough for the TruncateClaims background task to have 
executed at least once
+            Thread.sleep(3_000);
+
+            // File should NOT have been truncated because TruncateClaims 
checks ref count > 0
+            assertEquals(originalSize, Files.size(filePath),
+                    "File should not be truncated because TruncateClaims 
should skip claims with positive truncation reference count");
+
+            // Clean up: decrement the ref count and re-mark truncatable
+            localClaimManager.decrementTruncationReferenceCount(largeClaim);
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Now TruncateClaims should proceed
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                final byte[] readData = readFully(in, smallData.length);
+                assertArrayEquals(smallData, readData);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    public void testTruncationDisabledPreventsSettingTruncationCandidate() 
throws IOException {
+        recreateRepositoryWithPropertyOverrides(Map.of(
+                NiFiProperties.REPOSITORY_CONTENT_PREFIX + "default", 
rootFile.toString(),
+                NiFiProperties.CONTENT_CLAIM_TRUNCATION_ENABLED, "false"));
+
+        final ContentClaim smallClaim = repository.create(false);
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(new byte[100]);

Review Comment:
   This `100` value is used in multiple places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to