This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 60cd8a0a696 NIFI-15570 Track Content and Truncate large Resource 
Claims in FileSystemRepository (#10874)
60cd8a0a696 is described below

commit 60cd8a0a69613ce6fbe1aab8a7f64177670fd881
Author: Mark Payne <[email protected]>
AuthorDate: Mon Mar 16 09:36:02 2026 -0400

    NIFI-15570 Track Content and Truncate large Resource Claims in 
FileSystemRepository (#10874)
    
    Keep track of Content Claims where the last Claim in a Resource Claim can 
be truncated if it is large. Whenever FlowFile Repository is checkpointed, 
truncate any large Resource Claims when possible and necessary to avoid having 
a situtation where a small FlowFile in a given Resource Claim prevents a large 
Content Claim from being cleaned up.
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../controller/repository/claim/ContentClaim.java  |   8 +
 .../repository/claim/ResourceClaimManager.java     |  21 ++
 .../repository/FileSystemRepository.java           | 227 +++++++++++++++-
 .../repository/WriteAheadFlowFileRepository.java   | 137 +++++++---
 .../nifi/controller/TestFileSystemSwapManager.java |   9 +
 .../repository/TestFileSystemRepository.java       | 287 +++++++++++++++++++--
 .../TestWriteAheadFlowFileRepository.java          | 283 ++++++++++++++++++++
 .../repository/claim/StandardContentClaim.java     |  10 +
 .../claim/StandardResourceClaimManager.java        |  31 +++
 .../claim/TestStandardResourceClaimManager.java    |  51 ++++
 .../repository/ByteArrayContentRepository.java     |   5 +
 .../tests/system/GenerateTruncatableFlowFiles.java | 115 +++++++++
 .../services/org.apache.nifi.processor.Processor   |   1 +
 .../ContentClaimTruncationAfterRestartIT.java      | 163 ++++++++++++
 .../repositories/ContentClaimTruncationIT.java     | 153 +++++++++++
 15 files changed, 1442 insertions(+), 59 deletions(-)

diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
index 5c1d76bebbe..54745f9d28f 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaim.java
@@ -44,4 +44,12 @@ public interface ContentClaim extends 
Comparable<ContentClaim> {
      * @return the length of this ContentClaim
      */
     long getLength();
+
+    /**
+     * Indicates whether or not this ContentClaim is a candidate for 
truncation.
+     * @return true if this ContentClaim is a candidate for truncation, false 
otherwise
+     */
+    default boolean isTruncationCandidate() {
+        return false;
+    }
 }
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
index 4c68383d86b..4a54d371a1f 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/claim/ResourceClaimManager.java
@@ -112,6 +112,17 @@ public interface ResourceClaimManager {
      */
     void markDestructable(ResourceClaim claim);
 
+    /**
+     * Indicates that the Resource Claim associated with the given Content 
Claim can now be
+     * truncated to the start of the ContentClaim. This should only ever be 
called after it is
+     * guaranteed that the FlowFile Repository has been synchronized with its 
underlying
+     * storage component for the same reason as described in the {@link 
#markDestructable(ResourceClaim)}
+     * method.
+     *
+     * @param claim the ContentClaim that should be used for truncation
+     */
+    void markTruncatable(ContentClaim claim);
+
     /**
      * Drains up to {@code maxElements} Content Claims from the internal queue
      * of destructable content claims to the given {@code destination} so that
@@ -138,6 +149,16 @@ public interface ResourceClaimManager {
      */
     void drainDestructableClaims(Collection<ResourceClaim> destination, int 
maxElements, long timeout, TimeUnit unit);
 
+    /**
+     * Drains up to {@code maxElements} Content Claims from the internal queue
+     * of truncatable content claims to the given {@code destination} so that
+     * they can be truncated.
+     *
+     * @param destination to drain to
+     * @param maxElements max items to drain
+     */
+    void drainTruncatableClaims(Collection<ContentClaim> destination, int 
maxElements);
+
     /**
      * Clears the manager's memory of any and all ResourceClaims that it knows
      * about
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index 15ec9e78cdd..8987b6464f2 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -47,8 +47,10 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.channels.FileChannel;
 import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
 import java.nio.file.Path;
 import java.nio.file.SimpleFileVisitor;
 import java.nio.file.StandardOpenOption;
@@ -99,9 +101,12 @@ public class FileSystemRepository implements 
ContentRepository {
     private final List<String> containerNames;
     private final AtomicLong index;
 
-    private final ScheduledExecutorService executor = new FlowEngine(4, 
"FileSystemRepository Workers", true);
+    // Executor handles: BinDestructableClaims, one 
ArchiveOrDestroyDestructableClaims per content repository container,
+    // TruncateClaims, and archive directory scanning tasks submitted during 
initialization.
+    private final ScheduledExecutorService executor = new FlowEngine(6, 
"FileSystemRepository Workers", true);
     private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> 
reclaimable = new ConcurrentHashMap<>();
     private final Map<String, ContainerState> containerStateMap = new 
HashMap<>();
+    private final TruncationClaimManager truncationClaimManager = new 
TruncationClaimManager();
 
     // Queue for claims that are kept open for writing. Ideally, this will be 
at
     // least as large as the number of threads that will be updating the 
repository simultaneously but we don't want
@@ -170,12 +175,13 @@ public class FileSystemRepository implements 
ContentRepository {
             archiveData = true;
 
             if (maxArchiveSize == null) {
-                throw new RuntimeException("No value specified for property '"
-                                           + 
NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is 
enabled. You must configure the max disk usage in order to enable archiving.");
+                throw new RuntimeException("No value specified for property 
'%s' but archiving is enabled. You must configure the max disk usage in order 
to enable archiving.".formatted(
+                    NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE));
             }
 
             if 
(!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) {
-                throw new RuntimeException("Invalid value specified for the '" 
+ NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' property. Value must 
be in format: <XX>%");
+                throw new RuntimeException("Invalid value specified for the 
'%s' property. Value must be in format: <XX>%%".formatted(
+                    NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE));
             }
         } else if ("false".equalsIgnoreCase(enableArchiving)) {
             archiveData = false;
@@ -238,14 +244,15 @@ public class FileSystemRepository implements 
ContentRepository {
         this.resourceClaimManager = context.getResourceClaimManager();
         this.eventReporter = context.getEventReporter();
 
-        final Map<String, Path> fileRespositoryPaths = 
nifiProperties.getContentRepositoryPaths();
+        final Map<String, Path> fileRepositoryPaths = 
nifiProperties.getContentRepositoryPaths();
 
         executor.scheduleWithFixedDelay(new BinDestructableClaims(), 1, 1, 
TimeUnit.SECONDS);
-        for (int i = 0; i < fileRespositoryPaths.size(); i++) {
+        for (int i = 0; i < fileRepositoryPaths.size(); i++) {
             executor.scheduleWithFixedDelay(new 
ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS);
         }
 
         final long cleanupMillis = 
this.determineCleanupInterval(nifiProperties);
+        executor.scheduleWithFixedDelay(new TruncateClaims(), cleanupMillis, 
cleanupMillis, TimeUnit.MILLISECONDS);
 
         for (final Map.Entry<String, Path> containerEntry : 
containers.entrySet()) {
             final String containerName = containerEntry.getKey();
@@ -689,7 +696,16 @@ public class FileSystemRepository implements 
ContentRepository {
 
     @Override
     public int incrementClaimaintCount(final ContentClaim claim) {
-        return incrementClaimantCount(claim == null ? null : 
claim.getResourceClaim(), false);
+        if (claim == null) {
+            return 0;
+        }
+
+        if (claim.isTruncationCandidate() && claim instanceof final 
StandardContentClaim scc) {
+            LOG.debug("{} is a truncation candidate, but is being claimed 
again. Setting truncation candidate to false", claim);
+            scc.setTruncationCandidate(false);
+        }
+
+        return incrementClaimantCount(claim.getResourceClaim(), false);
     }
 
     protected int incrementClaimantCount(final ResourceClaim resourceClaim, 
final boolean newClaim) {
@@ -741,6 +757,7 @@ public class FileSystemRepository implements 
ContentRepository {
             }
         }
 
+        truncationClaimManager.removeTruncationClaims(claim);
         return true;
     }
 
@@ -1032,6 +1049,122 @@ public class FileSystemRepository implements 
ContentRepository {
         resourceClaimManager.purge();
     }
 
+    private class TruncateClaims implements Runnable {
+
+        @Override
+        public void run() {
+            final Map<String, Boolean> truncationActivationCache = new 
HashMap<>();
+
+            // Go through any known truncation claims and truncate them now if 
truncation is enabled for their container.
+            for (final String container : containerNames) {
+                if (isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    final List<ContentClaim> toTruncate = 
truncationClaimManager.removeTruncationClaims(container);
+                    if (toTruncate.isEmpty()) {
+                        continue;
+                    }
+
+                    truncateClaims(toTruncate, truncationActivationCache);
+                }
+            }
+
+            // Drain any Truncation Claims from the Resource Claim Manager.
+            // If able, truncate those claims. Otherwise, save those claims in 
the Truncation Claim Manager to be truncated on the next run.
+            // This prevents us from having a case where we could truncate a 
big claim but we don't because we're not yet running out of disk space,
+            // but then we later start to run out of disk space and lost the 
opportunity to truncate that big claim.
+            // Loop to drain the entire queue in a single invocation rather 
than waiting for the next scheduled run. Because the default
+            // interval is 1 minute, waiting for the next run could delay 
truncation on a disk that is already under pressure and increases
+            // the risk of having too many claims that the queue overflows (in 
which case we would lose some optimization).
+            while (true) {
+                final List<ContentClaim> toTruncate = new ArrayList<>();
+                resourceClaimManager.drainTruncatableClaims(toTruncate, 
10_000);
+                if (toTruncate.isEmpty()) {
+                    return;
+                }
+
+                truncateClaims(toTruncate, truncationActivationCache);
+            }
+        }
+
+        private void truncateClaims(final List<ContentClaim> toTruncate, final 
Map<String, Boolean> truncationActivationCache) {
+            final Map<String, List<ContentClaim>> claimsSkipped = new 
HashMap<>();
+
+            for (final ContentClaim claim : toTruncate) {
+                final String container = 
claim.getResourceClaim().getContainer();
+                if (!isTruncationActiveForContainer(container, 
truncationActivationCache)) {
+                    LOG.debug("Will not truncate {} because truncation is not 
active for container {}; will save for later truncation", claim, container);
+                    claimsSkipped.computeIfAbsent(container, key -> new 
ArrayList<>()).add(claim);
+                    continue;
+                }
+
+                if (claim.isTruncationCandidate()) {
+                    truncate(claim);
+                }
+            }
+
+            claimsSkipped.forEach(truncationClaimManager::addTruncationClaims);
+        }
+
+        private boolean isTruncationActiveForContainer(final String container, 
final Map<String, Boolean> activationCache) {
+            // If not archiving data, we consider truncation always active.
+            if (!archiveData) {
+                return true;
+            }
+
+            final Boolean cachedValue = activationCache.get(container);
+            if (cachedValue != null) {
+                return cachedValue;
+            }
+
+            if (!isArchiveClearedOnLastRun(container)) {
+                LOG.debug("Truncation is not active for container {} because 
the archive was not cleared on the last run", container);
+                activationCache.put(container, false);
+                return false;
+            }
+
+            final long usableSpace;
+            try {
+                usableSpace = getContainerUsableSpace(container);
+            } catch (final IOException ioe) {
+                LOG.warn("Failed to determine usable space for container {}. 
Will not truncate claims for this container", container, ioe);
+                return false;
+            }
+
+            final Long minUsableSpace = 
minUsableContainerBytesForArchive.get(container);
+            if (minUsableSpace != null && usableSpace < minUsableSpace) {
+                LOG.debug("Truncate is active for Container {} because usable 
space of {} bytes is below the desired threshold of {} bytes.",
+                    container, usableSpace, minUsableSpace);
+
+                activationCache.put(container, true);
+                return true;
+            }
+
+            activationCache.put(container, false);
+            return false;
+        }
+
+        private void truncate(final ContentClaim claim) {
+            LOG.info("Truncating {} to {} bytes because the FlowFile occupying 
the last {} bytes has been removed",
+                claim.getResourceClaim(), claim.getOffset(), 
claim.getLength());
+
+            final Path path = getPath(claim);
+            if (path == null) {
+                LOG.warn("Cannot truncate {} because the file cannot be 
found", claim);
+                return;
+            }
+
+            try (final FileChannel fileChannel = FileChannel.open(path, 
StandardOpenOption.WRITE)) {
+                fileChannel.truncate(claim.getOffset());
+            } catch (final NoSuchFileException nsfe) {
+                // This is unlikely but can occur if the claim was truncatable 
and the underlying Resource Claim becomes
+                // destructable. In this case, we may archive or delete the 
entire ResourceClaim. This is safe to ignore,
+                // since it means the data is cleaned up anyway.
+                LOG.debug("Failed to truncate {} because file [{}] does not 
exist", claim, path, nsfe);
+            } catch (final IOException e) {
+                LOG.warn("Failed to truncate {} to {} bytes", claim, 
claim.getOffset(), e);
+            }
+        }
+    }
+
     private class BinDestructableClaims implements Runnable {
 
         @Override
@@ -1120,6 +1253,11 @@ public class FileSystemRepository implements 
ContentRepository {
 
         final boolean archived = archive(curPath);
         LOG.debug("Successfully moved {} to archive", claim);
+
+        if (archived) {
+            truncationClaimManager.removeTruncationClaims(claim);
+        }
+
         return archived;
     }
 
@@ -1392,7 +1530,7 @@ public class FileSystemRepository implements 
ContentRepository {
         if (notYetExceedingThreshold.isEmpty()) {
             oldestContainerArchive = System.currentTimeMillis();
         } else {
-            oldestContainerArchive = 
notYetExceedingThreshold.get(0).getLastModTime();
+            oldestContainerArchive = 
notYetExceedingThreshold.getFirst().getLastModTime();
         }
 
         // Queue up the files in the order that they should be destroyed so 
that we don't have to scan the directories for a while.
@@ -1400,10 +1538,11 @@ public class FileSystemRepository implements 
ContentRepository {
             fileQueue.offer(toEnqueue);
         }
 
+        
containerState.setArchiveClearedOnLastRun(notYetExceedingThreshold.isEmpty());
+
         final long cleanupMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) 
- deleteOldestMillis - sortRemainingMillis - deleteExpiredMillis;
         LOG.debug("Oldest Archive Date for Container {} is {}; delete expired 
= {} ms, sort remaining = {} ms, delete oldest = {} ms, cleanup = {} ms",
                 containerName, new Date(oldestContainerArchive), 
deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis);
-        return;
     }
 
     private class ArchiveOrDestroyDestructableClaims implements Runnable {
@@ -1543,6 +1682,7 @@ public class FileSystemRepository implements 
ContentRepository {
 
         private volatile long bytesUsed = 0L;
         private volatile long checkUsedCutoffTimestamp = 0L;
+        private volatile boolean archiveClearedOnLastRun = false;
 
         public ContainerState(final String containerName, final boolean 
archiveEnabled, final long backPressureBytes, final long capacity) {
             this.containerName = containerName;
@@ -1661,6 +1801,24 @@ public class FileSystemRepository implements 
ContentRepository {
         public void decrementArchiveCount() {
             archivedFileCount.decrementAndGet();
         }
+
+        public void setArchiveClearedOnLastRun(final boolean 
archiveClearedOnLastRun) {
+            this.archiveClearedOnLastRun = archiveClearedOnLastRun;
+        }
+
+        public boolean isArchiveClearedOnLastRun() {
+            return archiveClearedOnLastRun;
+        }
+    }
+
+    // Visible for testing
+    protected boolean isArchiveClearedOnLastRun(final String containerName) {
+        final ContainerState containerState = 
containerStateMap.get(containerName);
+        if (containerState == null) {
+            return false;
+        }
+
+        return containerState.isArchiveClearedOnLastRun();
     }
 
     protected static class ClaimLengthPair {
@@ -1882,19 +2040,27 @@ public class FileSystemRepository implements 
ContentRepository {
                 // Mark the claim as no longer being able to be written to
                 resourceClaimManager.freeze(scc.getResourceClaim());
 
+                // If the content claim length is large (> 1 MB or the max 
appendable claim length),
+                // mark the claim as a truncation candidate
+                final boolean largeClaim = scc.getLength() > 
Math.min(1_000_000, maxAppendableClaimLength);
+                final boolean nonStartClaim = scc.getOffset() > 0;
+                if (largeClaim && nonStartClaim) {
+                    scc.setTruncationCandidate(true);
+                }
+
                 // ensure that the claim is no longer on the queue
                 writableClaimQueue.remove(new 
ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
 
-                bcos.close();
-                LOG.debug("Claim lenth >= max; Closing {}", this);
+                LOG.debug("Claim length >= max; Closing {}", this);
                 if (LOG.isTraceEnabled()) {
                     LOG.trace("Stack trace: ", new RuntimeException("Stack 
Trace for closing " + this));
                 }
+                bcos.close();
             }
         }
 
         @Override
-        public synchronized ContentClaim newContentClaim() throws IOException {
+        public synchronized ContentClaim newContentClaim() {
             scc = new StandardContentClaim(scc.getResourceClaim(), 
scc.getOffset() + Math.max(0, scc.getLength()));
             initialLength = 0;
             bytesWritten = 0L;
@@ -1903,4 +2069,41 @@ public class FileSystemRepository implements 
ContentRepository {
         }
     }
 
+    private static class TruncationClaimManager {
+        private static final int MAX_THRESHOLD = 100_000;
+        private final Map<String, List<ContentClaim>> truncationClaims = new 
HashMap<>();
+
+        synchronized void addTruncationClaims(final String container, final 
List<ContentClaim> claim) {
+            final List<ContentClaim> contentClaims = 
truncationClaims.computeIfAbsent(container, c -> new ArrayList<>());
+            contentClaims.addAll(claim);
+
+            // If we have too many claims, remove the smallest ones so that we 
only have the largest MAX_THRESHOLD claims.
+            if (contentClaims.size() > MAX_THRESHOLD) {
+                
contentClaims.sort(Comparator.comparingLong(ContentClaim::getLength).reversed());
+                final List<ContentClaim> discardableClaims = 
contentClaims.subList(MAX_THRESHOLD, contentClaims.size());
+                LOG.debug("Truncation Claim Manager has more than {} claims 
for container {}; discarding {} claims: {}",
+                    MAX_THRESHOLD, container, discardableClaims.size(), 
discardableClaims);
+                discardableClaims.clear();
+            }
+        }
+
+        synchronized List<ContentClaim> removeTruncationClaims(final String 
container) {
+            final List<ContentClaim> removed = 
truncationClaims.remove(container);
+            return removed == null ? Collections.emptyList() : removed;
+        }
+
+        synchronized List<ContentClaim> removeTruncationClaims(final 
ResourceClaim resourceClaim) {
+            final List<ContentClaim> contentClaims = 
truncationClaims.get(resourceClaim.getContainer());
+            if (contentClaims == null) {
+                return Collections.emptyList();
+            }
+
+            final List<ContentClaim> claimsToRemove = contentClaims.stream()
+                .filter(cc -> cc.getResourceClaim().equals(resourceClaim))
+                .toList();
+
+            contentClaims.removeAll(claimsToRemove);
+            return claimsToRemove;
+        }
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index a04b7527917..b886c062b2e 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -21,7 +21,9 @@ import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
+import org.apache.nifi.controller.repository.claim.StandardContentClaim;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.repository.schema.FieldCache;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
@@ -98,6 +100,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     private final List<File> flowFileRepositoryPaths = new ArrayList<>();
     private final ScheduledExecutorService checkpointExecutor;
     private final int maxCharactersToCache;
+    private final long truncationThreshold;
 
     private volatile Collection<SerializedRepositoryRecord> recoveredRecords = 
null;
     private final Set<ResourceClaim> orphanedResourceClaims = 
Collections.synchronizedSet(new HashSet<>());
@@ -132,6 +135,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     // before the data is destroyed, it's okay because the data will be 
unknown to the Content Repository, so it will be destroyed
     // on restart.
     private final ConcurrentMap<Integer, BlockingQueue<ResourceClaim>> 
claimsAwaitingDestruction = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, BlockingQueue<ContentClaim>> 
claimsAwaitingTruncation = new ConcurrentHashMap<>();
 
     /**
      * default no args constructor for service loading only.
@@ -143,6 +147,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         nifiProperties = null;
         retainOrphanedFlowFiles = true;
         maxCharactersToCache = 0;
+        truncationThreshold = Long.MAX_VALUE;
     }
 
     public WriteAheadFlowFileRepository(final NiFiProperties nifiProperties) {
@@ -153,6 +158,10 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         retainOrphanedFlowFiles = orphanedFlowFileProperty == null || 
Boolean.parseBoolean(orphanedFlowFileProperty);
 
         this.maxCharactersToCache = 
nifiProperties.getIntegerProperty(FLOWFILE_REPO_CACHE_SIZE, DEFAULT_CACHE_SIZE);
+        final long maxAppendableClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
+        // Cap the truncation threshold at 1 MB so that claims larger than 1 
MB are always eligible
+        // for truncation regardless of how large maxAppendableClaimSize is 
configured.
+        truncationThreshold = Math.min(1_000_000, maxAppendableClaimLength);
 
         final String directoryName = 
nifiProperties.getProperty(FLOWFILE_REPOSITORY_DIRECTORY_PREFIX);
         flowFileRepositoryPaths.add(new File(directoryName));
@@ -445,12 +454,13 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // The below code is not entirely thread-safe, but we are OK with that 
because the results aren't really harmful.
         // Specifically, if two different threads call updateRepository with 
DELETE records for the same Content Claim,
         // it's quite possible for claimant count to be 0 below, which results 
in two different threads adding the Content
-        // Claim to the 'claimsAwaitDestruction' map. As a result, we can call 
#markDestructable with the same ContentClaim
+        // Claim to the 'claimsAwaitingDestruction' map. As a result, we can 
call #markDestructable with the same ContentClaim
         // multiple times, and the #markDestructable method is not necessarily 
idempotent.
         // However, the result of this is that the FileSystem Repository may 
end up trying to remove the content multiple times.
         // This does not, however, cause problems, as ContentRepository should 
handle this
         // This does indicate that some refactoring should probably be 
performed, though, as this is not a very clean interface.
-        final Set<ResourceClaim> claimsToAdd = new HashSet<>();
+        final Set<ResourceClaim> destructableClaims = new HashSet<>();
+        final Set<ContentClaim> truncatableClaims = new HashSet<>();
 
         final Set<String> swapLocationsAdded = new HashSet<>();
         final Set<String> swapLocationsRemoved = new HashSet<>();
@@ -458,20 +468,34 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         for (final RepositoryRecord record : repositoryRecords) {
             updateClaimCounts(record);
 
+            final ContentClaim contentClaim = record.getCurrentClaim();
+            final boolean truncationCandidate = contentClaim != null && 
contentClaim.isTruncationCandidate();
+            final boolean claimChanged = 
!Objects.equals(record.getOriginalClaim(), contentClaim);
             if (record.getType() == RepositoryRecordType.DELETE) {
-                // For any DELETE record that we have, if claim is 
destructible, mark it so
-                if (record.getCurrentClaim() != null && 
isDestructable(record.getCurrentClaim())) {
-                    
claimsToAdd.add(record.getCurrentClaim().getResourceClaim());
+                // For any DELETE record that we have, if claim is 
destructible or truncatable, mark it so
+                if (isDestructable(contentClaim)) {
+                    destructableClaims.add(contentClaim.getResourceClaim());
+                } else if (truncationCandidate) {
+                    truncatableClaims.add(contentClaim);
                 }
 
-                // If the original claim is different than the current claim 
and the original claim is destructible, mark it so
-                if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getCurrentClaim()) && 
isDestructable(record.getOriginalClaim())) {
-                    
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
+                // If the original claim is different than the current claim 
and the original claim is destructible
+                // or truncatable, mark it so
+                if (claimChanged) {
+                    if (isDestructable(record.getOriginalClaim())) {
+                        
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
+                    } else if (record.getOriginalClaim() != null && 
record.getOriginalClaim().isTruncationCandidate()) {
+                        truncatableClaims.add(record.getOriginalClaim());
+                    }
                 }
             } else if (record.getType() == RepositoryRecordType.UPDATE) {
                 // if we have an update, and the original is no longer needed, 
mark original as destructible
-                if (record.getOriginalClaim() != null && 
record.getCurrentClaim() != record.getOriginalClaim() && 
isDestructable(record.getOriginalClaim())) {
-                    
claimsToAdd.add(record.getOriginalClaim().getResourceClaim());
+                if (claimChanged) {
+                    if (isDestructable(record.getOriginalClaim())) {
+                        
destructableClaims.add(record.getOriginalClaim().getResourceClaim());
+                    } else if (record.getOriginalClaim() != null && 
record.getOriginalClaim().isTruncationCandidate()) {
+                        truncatableClaims.add(record.getOriginalClaim());
+                    }
                 }
             } else if (record.getType() == RepositoryRecordType.SWAP_OUT) {
                 final String swapLocation = record.getSwapLocation();
@@ -484,13 +508,16 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             }
         }
 
-        // Once the content claim counts have been updated for all records, 
collect any transient claims that are eligible for destruction
+        // Once the content claim counts have been updated for all records, 
collect any transient
+        // claims that are eligible for destruction or truncation
         for (final RepositoryRecord record : repositoryRecords) {
             final List<ContentClaim> transientClaims = 
record.getTransientClaims();
             if (transientClaims != null) {
                 for (final ContentClaim transientClaim : transientClaims) {
                     if (isDestructable(transientClaim)) {
-                        claimsToAdd.add(transientClaim.getResourceClaim());
+                        
destructableClaims.add(transientClaim.getResourceClaim());
+                    } else if (transientClaim.isTruncationCandidate()) {
+                        truncatableClaims.add(transientClaim);
                     }
                 }
             }
@@ -504,19 +531,15 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             }
         }
 
-        if (!claimsToAdd.isEmpty()) {
-            // Get / Register a Set<ContentClaim> for the given Partition Index
-            final Integer partitionKey = Integer.valueOf(partitionIndex);
-            BlockingQueue<ResourceClaim> claimQueue = 
claimsAwaitingDestruction.get(partitionKey);
-            if (claimQueue == null) {
-                claimQueue = new LinkedBlockingQueue<>();
-                final BlockingQueue<ResourceClaim> existingClaimQueue = 
claimsAwaitingDestruction.putIfAbsent(partitionKey, claimQueue);
-                if (existingClaimQueue != null) {
-                    claimQueue = existingClaimQueue;
-                }
-            }
+        if (!destructableClaims.isEmpty()) {
+            // Get / Register a Set<ResourceClaim> for the given Partition 
Index
+            final BlockingQueue<ResourceClaim> claimQueue = 
claimsAwaitingDestruction.computeIfAbsent(partitionIndex, key -> new 
LinkedBlockingQueue<>());
+            claimQueue.addAll(destructableClaims);
+        }
 
-            claimQueue.addAll(claimsToAdd);
+        if (!truncatableClaims.isEmpty()) {
+            final BlockingQueue<ContentClaim> claimQueue = 
claimsAwaitingTruncation.computeIfAbsent(partitionIndex, key -> new 
LinkedBlockingQueue<>());
+            claimQueue.addAll(truncatableClaims);
         }
     }
 
@@ -566,16 +589,24 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
     @Override
     public void onSync(final int partitionIndex) {
-        final BlockingQueue<ResourceClaim> claimQueue = 
claimsAwaitingDestruction.get(partitionIndex);
-        if (claimQueue == null) {
-            return;
+        final BlockingQueue<ResourceClaim> destructionClaimQueue = 
claimsAwaitingDestruction.get(partitionIndex);
+        if (destructionClaimQueue != null) {
+            final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
+            destructionClaimQueue.drainTo(claimsToDestroy);
+
+            for (final ResourceClaim claim : claimsToDestroy) {
+                markDestructable(claim);
+            }
         }
 
-        final Set<ResourceClaim> claimsToDestroy = new HashSet<>();
-        claimQueue.drainTo(claimsToDestroy);
+        final BlockingQueue<ContentClaim> truncationClaimQueue = 
claimsAwaitingTruncation.get(partitionIndex);
+        if (truncationClaimQueue != null) {
+            final Set<ContentClaim> claimsToTruncate = new HashSet<>();
+            truncationClaimQueue.drainTo(claimsToTruncate);
 
-        for (final ResourceClaim claim : claimsToDestroy) {
-            markDestructable(claim);
+            for (final ContentClaim claim : claimsToTruncate) {
+                claimManager.markTruncatable(claim);
+            }
         }
     }
 
@@ -589,6 +620,15 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
                 markDestructable(claim);
             }
         }
+
+        for (final BlockingQueue<ContentClaim> claimQueue : 
claimsAwaitingTruncation.values()) {
+            final Set<ContentClaim> claimsToTruncate = new HashSet<>();
+            claimQueue.drainTo(claimsToTruncate);
+
+            for (final ContentClaim claim : claimsToTruncate) {
+                claimManager.markTruncatable(claim);
+            }
+        }
     }
 
     /**
@@ -723,6 +763,10 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             queueMap.put(queue.getIdentifier(), queue);
         }
 
+        final Set<StandardContentClaim> truncationEligibleClaims = new 
HashSet<>();
+        final Set<ContentClaim> forbiddenTruncationClaims = new HashSet<>();
+        final Map<ResourceClaim, ContentClaim> 
latestContentClaimByResourceClaim = new HashMap<>();
+
         final List<SerializedRepositoryRecord> dropRecords = new ArrayList<>();
         int numFlowFilesMissingQueue = 0;
         long maxId = 0;
@@ -748,6 +792,15 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
             }
 
             final ContentClaim claim = record.getContentClaim();
+
+            // Track the latest Content Claim for each Resource Claim so that 
we can determine which claims are eligible for truncation.
+            if (claim != null) {
+                final ContentClaim latestContentClaim = 
latestContentClaimByResourceClaim.get(claim.getResourceClaim());
+                if (latestContentClaim == null || claim.getOffset() > 
latestContentClaim.getOffset()) {
+                    
latestContentClaimByResourceClaim.put(claim.getResourceClaim(), claim);
+                }
+            }
+
             final FlowFileQueue flowFileQueue = queueMap.get(queueId);
             final boolean orphaned = flowFileQueue == null;
             if (orphaned) {
@@ -777,6 +830,18 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
                 continue;
             } else if (claim != null) {
+                // If the claim exceeds the max appendable claim length on its 
own and doesn't start the Resource Claim,
+                // we will consider it to be eligible for truncation. However, 
if there are multiple FlowFiles sharing the
+                // same claim, we cannot truncate it because doing so would 
affect the other FlowFiles.
+                if (claim.getOffset() > 0 && claim.getLength() > 
truncationThreshold && claim instanceof final StandardContentClaim scc) {
+                    if (forbiddenTruncationClaims.contains(claim) || 
truncationEligibleClaims.contains(scc)) {
+                        truncationEligibleClaims.remove(scc);
+                        forbiddenTruncationClaims.add(scc);
+                    } else {
+                        truncationEligibleClaims.add(scc);
+                    }
+                }
+
                 claimManager.incrementClaimantCount(claim.getResourceClaim());
             }
 
@@ -786,6 +851,14 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // If recoveredRecords has been populated it need to be nulled out now 
because it is no longer useful and can be garbage collected.
         recoveredRecords = null;
 
+        // If any Content Claim was determined to be truncatable, mark it as 
such now.
+        for (final StandardContentClaim eligible : truncationEligibleClaims) {
+            final ContentClaim latestForResource = 
latestContentClaimByResourceClaim.get(eligible.getResourceClaim());
+            if (Objects.equals(eligible, latestForResource)) {
+                eligible.setTruncationCandidate(true);
+            }
+        }
+
         // Set the AtomicLong to 1 more than the max ID so that calls to 
#getNextFlowFileSequence() will
         // return the appropriate number.
         flowFileSequenceGenerator.set(maxId + 1);
@@ -852,7 +925,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
     }
 
     @Override
-    public long getMaxFlowFileIdentifier() throws IOException {
+    public long getMaxFlowFileIdentifier() {
         // flowFileSequenceGenerator is 1 more than the MAX so that we can 
call #getAndIncrement on the AtomicLong
         return flowFileSequenceGenerator.get() - 1;
     }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
index 9e0a1324a48..5a6b9d89bf6 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/TestFileSystemSwapManager.java
@@ -21,6 +21,7 @@ import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.FlowFileRepository;
 import org.apache.nifi.controller.repository.SwapContents;
 import org.apache.nifi.controller.repository.SwapManagerInitializationContext;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.events.EventReporter;
@@ -223,6 +224,10 @@ public class TestFileSystemSwapManager {
         public void markDestructable(ResourceClaim claim) {
         }
 
+        @Override
+        public void markTruncatable(final ContentClaim claim) {
+        }
+
         @Override
         public void drainDestructableClaims(Collection<ResourceClaim> 
destination, int maxElements) {
         }
@@ -231,6 +236,10 @@ public class TestFileSystemSwapManager {
         public void drainDestructableClaims(Collection<ResourceClaim> 
destination, int maxElements, long timeout, TimeUnit unit) {
         }
 
+        @Override
+        public void drainTruncatableClaims(final Collection<ContentClaim> 
destination, final int maxElements) {
+        }
+
         @Override
         public void purge() {
         }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
index 7f0e2a7a9d7..102b927af3e 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestFileSystemRepository.java
@@ -59,6 +59,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -81,14 +82,19 @@ public class TestFileSystemRepository {
     private Path originalNifiPropertiesFile;
     private Path rootFile;
     private NiFiProperties nifiProperties;
+    private long maxClaimLength;
 
     @BeforeEach
     public void setup() throws IOException {
         originalNifiPropertiesFile = 
Paths.get("src/test/resources/conf/nifi.properties");
         rootFile = tempDir.resolve("content_repository");
         final String contentRepositoryDirectory = 
NiFiProperties.REPOSITORY_CONTENT_PREFIX.concat("default");
-        final Map<String, String> additionalProperties = 
Map.of(contentRepositoryDirectory, rootFile.toString());
+        final Map<String, String> additionalProperties = Map.of(
+            contentRepositoryDirectory, rootFile.toString(),
+            NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY, "1 sec"
+        );
         nifiProperties = 
NiFiProperties.createBasicNiFiProperties(originalNifiPropertiesFile.toString(), 
additionalProperties);
+        maxClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
         repository = new FileSystemRepository(nifiProperties);
         claimManager = new StandardResourceClaimManager();
         repository.initialize(new 
StandardContentRepositoryContext(claimManager, EventReporter.NO_OP));
@@ -145,7 +151,6 @@ public class TestFileSystemRepository {
     @Timeout(30)
     public void testClaimsArchivedWhenMarkedDestructable() throws IOException, 
InterruptedException {
         final ContentClaim contentClaim = repository.create(false);
-        final long configuredAppendableClaimLength = 
DataUnit.parseDataSize(nifiProperties.getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
         final Map<String, Path> containerPaths = 
nifiProperties.getContentRepositoryPaths();
         assertEquals(1, containerPaths.size());
         final String containerName = containerPaths.keySet().iterator().next();
@@ -154,7 +159,7 @@ public class TestFileSystemRepository {
             long bytesWritten = 0L;
             final byte[] bytes = "Hello 
World".getBytes(StandardCharsets.UTF_8);
 
-            while (bytesWritten <= configuredAppendableClaimLength) {
+            while (bytesWritten <= maxClaimLength) {
                 out.write(bytes);
                 bytesWritten += bytes.length;
             }
@@ -480,12 +485,9 @@ public class TestFileSystemRepository {
         repository.incrementClaimaintCount(claim);
 
         final Path claimPath = getPath(claim);
-        final String maxAppendableClaimLength = 
nifiProperties.getMaxAppendableClaimSize();
-        final int maxClaimLength = 
DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue();
-
         // Create the file.
         try (final OutputStream out = repository.write(claim)) {
-            out.write(new byte[maxClaimLength]);
+            out.write(new byte[(int) maxClaimLength]);
         }
 
         int count = repository.decrementClaimantCount(claim);
@@ -502,10 +504,14 @@ public class TestFileSystemRepository {
     }
 
     private Path getPath(final ContentClaim claim) {
+        return getPath(repository, claim);
+    }
+
+    private Path getPath(final FileSystemRepository repo, final ContentClaim 
claim) {
         try {
-            final Method m = 
repository.getClass().getDeclaredMethod("getPath", ContentClaim.class);
+            final Method m = 
FileSystemRepository.class.getDeclaredMethod("getPath", ContentClaim.class);
             m.setAccessible(true);
-            return (Path) m.invoke(repository, claim);
+            return (Path) m.invoke(repo, claim);
         } catch (final Exception e) {
             throw new RuntimeException("Could not invoke #getPath on 
FileSystemRepository due to " + e);
         }
@@ -694,9 +700,7 @@ public class TestFileSystemRepository {
 
         // write at least 1 MB to the output stream so that when we close the 
output stream
         // the repo won't keep the stream open.
-        final String maxAppendableClaimLength = 
nifiProperties.getMaxAppendableClaimSize();
-        final int maxClaimLength = 
DataUnit.parseDataSize(maxAppendableClaimLength, DataUnit.B).intValue();
-        final byte[] buff = new byte[maxClaimLength];
+        final byte[] buff = new byte[(int) maxClaimLength];
         out.write(buff);
         out.write(buff);
 
@@ -897,14 +901,267 @@ public class TestFileSystemRepository {
         }
     }
 
+    @Test
+    public void testTruncationCandidateMarkedOnlyForLargeNonStartClaim() 
throws IOException {
+        // Create a small claim at offset 0. Write less data than 
maxAppendableClaimLength so the ResourceClaim
+        // is recycled back to the writable queue.
+        final ContentClaim smallClaim = repository.create(false);
+        final byte[] smallData = new byte[100];
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(smallData);
+        }
+        assertFalse(smallClaim.isTruncationCandidate());
+
+        // Now create a large claim on potentially the same ResourceClaim, 
writing more than maxAppendableClaimLength
+        // to freeze the ResourceClaim. Because smallClaim was small and 
recycled, largeClaim will be at a non-zero
+        // offset on the same ResourceClaim.
+        final ContentClaim largeClaim = repository.create(false);
+        final byte[] largeData = new byte[(int) maxClaimLength + 1024];
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(largeData);
+        }
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Negative case: create a standalone large claim at offset 0 (fresh 
ResourceClaim)
+        // To ensure a fresh ResourceClaim, write large data to all writable 
claims to exhaust them,
+        // then create a new claim that starts at offset 0.
+        // The simplest approach: create claims until we get one at offset 0.
+        ContentClaim offsetZeroClaim = null;
+        for (int i = 0; i < 20; i++) {
+            final ContentClaim candidate = repository.create(false);
+            if (candidate instanceof StandardContentClaim standardContentClaim 
&& standardContentClaim.getOffset() == 0) {
+                // Write large data that exceeds maxAppendableClaimLength
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+                offsetZeroClaim = candidate;
+                break;
+            } else {
+                // Write large data to exhaust this claim's ResourceClaim
+                try (final OutputStream out = repository.write(candidate)) {
+                    out.write(new byte[(int) maxClaimLength + 1024]);
+                }
+            }
+        }
+
+        assertNotNull(offsetZeroClaim);
+        assertFalse(offsetZeroClaim.isTruncationCandidate());
+    }
+
+    @Test
+    public void testIncrementClaimantCountClearsTruncationCandidate() throws 
IOException {
+        // Create a small claim to start a ResourceClaim, then a large claim 
to freeze it
+        final ContentClaim smallClaim = repository.create(false);
+        try (final OutputStream out = repository.write(smallClaim)) {
+            out.write(new byte[100]);
+        }
+
+        final ContentClaim largeClaim = repository.create(false);
+        try (final OutputStream out = repository.write(largeClaim)) {
+            out.write(new byte[(int) maxClaimLength + 1024]);
+        }
+
+        assertTrue(largeClaim.isTruncationCandidate());
+
+        // Simulate a clone by incrementing claimant count
+        repository.incrementClaimaintCount(largeClaim);
+
+        assertFalse(largeClaim.isTruncationCandidate());
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateClaimReducesFileSizeAndPreservesEarlierData() 
throws IOException, InterruptedException {
+        // We need to create our own repository that overrides 
getContainerUsableSpace to simulate disk pressure
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return 0; // Extreme disk pressure
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            final byte[] smallData = "Hello World - small claim 
data".getBytes(StandardCharsets.UTF_8);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(smallData);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            final byte[] largeData = new byte[(int) maxClaimLength + 4096];
+            new Random().nextBytes(largeData);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(largeData);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            // Both claims should share the same resource claim
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            // Get the file path
+            final Path filePath = getPath(localRepository, smallClaim);
+            assertNotNull(filePath);
+            final long originalSize = Files.size(filePath);
+            assertTrue(originalSize > maxClaimLength);
+
+            // Decrement claimant count for the large claim to 0 (small claim 
still holds a reference)
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+
+            // Mark the large claim as truncatable
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for the TruncateClaims background task to truncate the 
file. Poll the file size until it shrinks.
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still fully readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                final byte[] readData = readFully(in, smallData.length);
+                assertArrayEquals(smallData, readData);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTruncateNotActiveWhenDiskNotPressured() throws 
IOException, InterruptedException {
+        // Create repository with ample disk space
+        shutdown();
+
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return Long.MAX_VALUE; // Plenty of space
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            Thread.sleep(3000L);
+            assertEquals(originalSize, Files.size(filePath));
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
+    @Test
+    @Timeout(90)
+    public void testTruncateClaimDeferredThenExecutedWhenPressureStarts() 
throws IOException, InterruptedException {
+        // Create a repository where disk pressure can be toggled
+        shutdown();
+
+        final AtomicLong usableSpace = new AtomicLong(Long.MAX_VALUE);
+        final FileSystemRepository localRepository = new 
FileSystemRepository(nifiProperties) {
+            @Override
+            public long getContainerUsableSpace(final String containerName) {
+                return usableSpace.get();
+            }
+
+            @Override
+            protected boolean isArchiveClearedOnLastRun(final String 
containerName) {
+                return true;
+            }
+        };
+
+        try {
+            final StandardResourceClaimManager localClaimManager = new 
StandardResourceClaimManager();
+            localRepository.initialize(new 
StandardContentRepositoryContext(localClaimManager, EventReporter.NO_OP));
+            localRepository.purge();
+
+            // Create a small claim then a large claim on the same 
ResourceClaim
+            final ContentClaim smallClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(smallClaim)) {
+                out.write(new byte[100]);
+            }
+
+            final ContentClaim largeClaim = localRepository.create(false);
+            try (final OutputStream out = localRepository.write(largeClaim)) {
+                out.write(new byte[(int) maxClaimLength + 4096]);
+            }
+
+            assertTrue(largeClaim.isTruncationCandidate());
+            assertEquals(smallClaim.getResourceClaim(), 
largeClaim.getResourceClaim());
+
+            final Path filePath = getPath(localRepository, smallClaim);
+            final long originalSize = Files.size(filePath);
+
+            
localClaimManager.decrementClaimantCount(largeClaim.getResourceClaim());
+            localClaimManager.markTruncatable(largeClaim);
+
+            // Wait for at least one run of the background task with NO 
pressure.
+            // File should NOT be truncated.
+            Thread.sleep(3_000);
+            assertEquals(originalSize, Files.size(filePath));
+
+            // Now turn on disk pressure
+            usableSpace.set(0);
+
+            // Wait for the next background task run to truncate the file
+            final long expectedTruncatedSize = largeClaim.getOffset();
+            while (Files.size(filePath) != expectedTruncatedSize) {
+                Thread.sleep(100L);
+            }
+
+            // Verify the small claim's data is still readable
+            try (final InputStream in = localRepository.read(smallClaim)) {
+                assertNotNull(in);
+            }
+        } finally {
+            localRepository.shutdown();
+        }
+    }
+
     private byte[] readFully(final InputStream inStream, final int size) 
throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream(size);
+        final ByteArrayOutputStream outputStream = new 
ByteArrayOutputStream(size);
         int len;
         final byte[] buffer = new byte[size];
         while ((len = inStream.read(buffer)) >= 0) {
-            baos.write(buffer, 0, len);
+            outputStream.write(buffer, 0, len);
         }
 
-        return baos.toByteArray();
+        return outputStream.toByteArray();
     }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
index 9d6118e8ebe..3a6453e3124 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestWriteAheadFlowFileRepository.java
@@ -810,4 +810,287 @@ public class TestWriteAheadFlowFileRepository {
             return swapLocation;
         }
     }
+
+    // 
=========================================================================
+    // Truncation Feature: Helpers
+    // 
=========================================================================
+
+    /**
+     * Creates a mock queue + connection + queueProvider wired together, 
suitable for runtime truncation tests.
+     * Returns [claimManager, queueProvider, queue].
+     */
+    private record RuntimeRepoContext(StandardResourceClaimManager 
claimManager, TestQueueProvider queueProvider, FlowFileQueue queue) {
+    }
+
+    private RuntimeRepoContext createRuntimeRepoContext() {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider queueProvider = new TestQueueProvider();
+        final Connection connection = Mockito.mock(Connection.class);
+        when(connection.getIdentifier()).thenReturn("1234");
+        
when(connection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileQueue queue = Mockito.mock(FlowFileQueue.class);
+        when(queue.getIdentifier()).thenReturn("1234");
+        when(connection.getFlowFileQueue()).thenReturn(queue);
+        queueProvider.addConnection(connection);
+        return new RuntimeRepoContext(claimManager, queueProvider, queue);
+    }
+
+    private StandardContentClaim createClaim(final ResourceClaim rc, final 
long offset, final long length, final boolean truncationCandidate) {
+        final StandardContentClaim claim = new StandardContentClaim(rc, 
offset);
+        claim.setLength(length);
+        if (truncationCandidate) {
+            claim.setTruncationCandidate(true);
+        }
+        return claim;
+    }
+
+    private void createAndDeleteFlowFile(final WriteAheadFlowFileRepository 
repo, final FlowFileQueue queue,
+                                         final ContentClaim claim) throws 
IOException {
+        final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(claim)
+                .build();
+
+        final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(queue);
+        createRecord.setWorking(flowFile, false);
+        createRecord.setDestination(queue);
+        repo.updateRepository(List.of(createRecord));
+
+        final StandardRepositoryRecord deleteRecord = new 
StandardRepositoryRecord(queue, flowFile);
+        deleteRecord.markForDelete();
+        repo.updateRepository(List.of(deleteRecord));
+    }
+
+    /**
+     * Writes FlowFiles (one per claim) to a new repo, closes it, then 
recovers into a fresh repo
+     * and returns the recovered FlowFileRecords.
+     */
+    private List<FlowFileRecord> writeAndRecover(final ContentClaim... claims) 
throws IOException {
+        final ResourceClaimManager writeClaimManager = new 
StandardResourceClaimManager();
+        final TestQueueProvider writeQueueProvider = new TestQueueProvider();
+        final Connection writeConnection = Mockito.mock(Connection.class);
+        when(writeConnection.getIdentifier()).thenReturn("1234");
+        
when(writeConnection.getDestination()).thenReturn(Mockito.mock(Connectable.class));
+        final FlowFileSwapManager swapMgr = new MockFlowFileSwapManager();
+        final FlowFileQueue writeQueue = new StandardFlowFileQueue("1234", 
null, null, null, swapMgr, null, 10000, "0 sec", 0L, "0 B");
+        when(writeConnection.getFlowFileQueue()).thenReturn(writeQueue);
+        writeQueueProvider.addConnection(writeConnection);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(writeClaimManager);
+            repo.loadFlowFiles(writeQueueProvider);
+
+            final List<RepositoryRecord> records = new ArrayList<>();
+            for (int i = 0; i < claims.length; i++) {
+                final FlowFileRecord ff = new StandardFlowFileRecord.Builder()
+                        .id(i + 1L)
+                        .addAttribute("uuid", "11111111-1111-1111-1111-" + 
String.format("%012d", i + 1))
+                        .contentClaim(claims[i])
+                        .build();
+                final StandardRepositoryRecord rec = new 
StandardRepositoryRecord(writeQueue);
+                rec.setWorking(ff, false);
+                rec.setDestination(writeQueue);
+                records.add(rec);
+            }
+            repo.updateRepository(records);
+        }
+
+        // Recover
+        final List<FlowFileRecord> recovered = new ArrayList<>();
+        final FlowFileQueue recoveryQueue = Mockito.mock(FlowFileQueue.class);
+        when(recoveryQueue.getIdentifier()).thenReturn("1234");
+        doAnswer(invocation -> {
+            recovered.add((FlowFileRecord) invocation.getArguments()[0]);
+            return null;
+        }).when(recoveryQueue).put(any(FlowFileRecord.class));
+
+        final Connection recoveryConnection = Mockito.mock(Connection.class);
+        when(recoveryConnection.getIdentifier()).thenReturn("1234");
+        when(recoveryConnection.getFlowFileQueue()).thenReturn(recoveryQueue);
+        final TestQueueProvider recoveryQueueProvider = new 
TestQueueProvider();
+        recoveryQueueProvider.addConnection(recoveryConnection);
+
+        try (final WriteAheadFlowFileRepository repo2 = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo2.initialize(new StandardResourceClaimManager());
+            repo2.loadFlowFiles(recoveryQueueProvider);
+        }
+
+        return recovered;
+    }
+
+    private FlowFileRecord findRecoveredByOffset(final List<FlowFileRecord> 
recovered, final long offset) {
+        return recovered.stream()
+                .filter(ff -> ff.getContentClaim() != null && 
ff.getContentClaim().getOffset() == offset)
+                .findFirst()
+                .orElse(null);
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Runtime Tests
+    // 
=========================================================================
+
+    @Test
+    public void testDeleteRecordRoutesTruncatableClaimToTruncationQueue() 
throws IOException {
+        final RuntimeRepoContext context = createRuntimeRepoContext();
+        final ResourceClaim resourceClaim = 
context.claimManager().newResourceClaim("container", "section", "1", false, 
false);
+        context.claimManager().incrementClaimantCount(resourceClaim);
+        context.claimManager().incrementClaimantCount(resourceClaim); // count 
= 2 so that after delete decrement it stays > 0 (not destructable)
+        final StandardContentClaim contentClaim = createClaim(resourceClaim, 
1024L, 5_000_000L, true);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(context.claimManager());
+            repo.loadFlowFiles(context.queueProvider());
+            createAndDeleteFlowFile(repo, context.queue(), contentClaim);
+            repo.checkpoint();
+        }
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        context.claimManager().drainTruncatableClaims(truncated, 100);
+        assertTrue(truncated.contains(contentClaim));
+    }
+
+    @Test
+    public void testDestructableClaimTakesPriorityOverTruncatable() throws 
IOException {
+        final RuntimeRepoContext context = createRuntimeRepoContext();
+        final ResourceClaim resourceClaim = 
context.claimManager().newResourceClaim("container", "section", "1", false, 
false);
+        context.claimManager().incrementClaimantCount(resourceClaim); // count 
= 1 -- will reach 0 after delete
+        final StandardContentClaim contentClaim = createClaim(resourceClaim, 
1024L, 5_000_000L, true);
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(context.claimManager());
+            repo.loadFlowFiles(context.queueProvider());
+            createAndDeleteFlowFile(repo, context.queue(), contentClaim);
+            repo.checkpoint();
+        }
+
+        final List<ResourceClaim> destructed = new ArrayList<>();
+        context.claimManager().drainDestructableClaims(destructed, 100);
+        assertTrue(destructed.contains(resourceClaim));
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        context.claimManager().drainTruncatableClaims(truncated, 100);
+        assertFalse(truncated.contains(contentClaim));
+    }
+
+    @Test
+    public void testUpdateRecordOriginalClaimQueuedForTruncation() throws 
IOException {
+        final RuntimeRepoContext context = createRuntimeRepoContext();
+
+        final ResourceClaim originalResourceClaim = 
context.claimManager().newResourceClaim("container", "section", "1", false, 
false);
+        context.claimManager().incrementClaimantCount(originalResourceClaim);
+        context.claimManager().incrementClaimantCount(originalResourceClaim); 
// count = 2 so it stays > 0 after decrement
+        final StandardContentClaim originalClaim = 
createClaim(originalResourceClaim, 2048L, 5_000_000L, true);
+
+        final ResourceClaim newResourceClaim = 
context.claimManager().newResourceClaim("container", "section", "2", false, 
false);
+        context.claimManager().incrementClaimantCount(newResourceClaim);
+        final StandardContentClaim newClaim = createClaim(newResourceClaim, 
0L, 100L, false);
+
+        final FlowFileRecord originalFlowFile = new 
StandardFlowFileRecord.Builder()
+                .id(1L)
+                .addAttribute("uuid", UUID.randomUUID().toString())
+                .contentClaim(originalClaim)
+                .build();
+
+        try (final WriteAheadFlowFileRepository repo = new 
WriteAheadFlowFileRepository(niFiProperties)) {
+            repo.initialize(context.claimManager());
+            repo.loadFlowFiles(context.queueProvider());
+
+            final StandardRepositoryRecord createRecord = new 
StandardRepositoryRecord(context.queue());
+            createRecord.setWorking(originalFlowFile, false);
+            createRecord.setDestination(context.queue());
+            repo.updateRepository(List.of(createRecord));
+
+            final FlowFileRecord updatedFlowFile = new 
StandardFlowFileRecord.Builder()
+                    .fromFlowFile(originalFlowFile)
+                    .contentClaim(newClaim)
+                    .build();
+            final StandardRepositoryRecord updateRecord = new 
StandardRepositoryRecord(context.queue(), originalFlowFile);
+            updateRecord.setWorking(updatedFlowFile, true);
+            updateRecord.setDestination(context.queue());
+            repo.updateRepository(List.of(updateRecord));
+            repo.checkpoint();
+        }
+
+        final List<ContentClaim> truncated = new ArrayList<>();
+        context.claimManager().drainTruncatableClaims(truncated, 100);
+        assertTrue(truncated.contains(originalClaim));
+    }
+
+    // 
=========================================================================
+    // Truncation Feature: Recovery Tests
+    // 
=========================================================================
+
+    @Test
+    public void testRecoveryMarksTruncationCandidateForLargeTailClaim() throws 
IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim resourceClaim = 
claimManager.newResourceClaim("container", "section", "1", false, false);
+        final StandardContentClaim smallClaim = createClaim(resourceClaim, 0L, 
100L, false);
+        final StandardContentClaim largeClaim = createClaim(resourceClaim, 
100L, 2_000_000L, false);
+
+        final List<FlowFileRecord> recovered = writeAndRecover(smallClaim, 
largeClaim);
+
+        final FlowFileRecord recoveredLargeFlowFile = 
findRecoveredByOffset(recovered, 100L);
+        assertNotNull(recoveredLargeFlowFile);
+        
assertTrue(recoveredLargeFlowFile.getContentClaim().isTruncationCandidate());
+
+        final FlowFileRecord recoveredSmallFlowFile = 
findRecoveredByOffset(recovered, 0L);
+        assertNotNull(recoveredSmallFlowFile);
+        
assertFalse(recoveredSmallFlowFile.getContentClaim().isTruncationCandidate());
+    }
+
+    @Test
+    public void testRecoveryDoesNotMarkClonedClaim() throws IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim resourceClaim = 
claimManager.newResourceClaim("container", "section", "1", false, false);
+        final StandardContentClaim sharedClaim = createClaim(resourceClaim, 
100L, 2_000_000L, false);
+
+        // Two FlowFiles sharing the same claim (clone scenario)
+        final List<FlowFileRecord> recovered = writeAndRecover(sharedClaim, 
sharedClaim);
+
+        for (final FlowFileRecord flowFile : recovered) {
+            if (flowFile.getContentClaim() != null) {
+                
assertFalse(flowFile.getContentClaim().isTruncationCandidate());
+            }
+        }
+    }
+
+    @Test
+    public void testRecoveryOnlyMarksTailClaim() throws IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim resourceClaim = 
claimManager.newResourceClaim("container", "section", "1", false, false);
+        final StandardContentClaim nonTailClaim = createClaim(resourceClaim, 
100L, 2_000_000L, false);
+        final StandardContentClaim tailClaim = createClaim(resourceClaim, 
2_000_100L, 3_000_000L, false);
+
+        final List<FlowFileRecord> recovered = writeAndRecover(nonTailClaim, 
tailClaim);
+
+        final FlowFileRecord tailFlowFile = findRecoveredByOffset(recovered, 
2_000_100L);
+        assertNotNull(tailFlowFile);
+        assertTrue(tailFlowFile.getContentClaim().isTruncationCandidate());
+
+        final FlowFileRecord nonTailFlowFile = 
findRecoveredByOffset(recovered, 100L);
+        assertNotNull(nonTailFlowFile);
+        assertFalse(nonTailFlowFile.getContentClaim().isTruncationCandidate());
+    }
+
+    @Test
+    public void testRecoverySmallClaimAfterLargeDoesNotMarkLarge() throws 
IOException {
+        final StandardResourceClaimManager claimManager = new 
StandardResourceClaimManager();
+        final ResourceClaim resourceClaim = 
claimManager.newResourceClaim("container", "section", "1", false, false);
+        final StandardContentClaim firstSmallClaim = 
createClaim(resourceClaim, 0L, 100L, false);
+        final StandardContentClaim largeClaim = createClaim(resourceClaim, 
100L, 2_000_000L, false);
+        final StandardContentClaim secondSmallClaim = 
createClaim(resourceClaim, 2_000_100L, 50L, false);
+
+        final List<FlowFileRecord> recovered = 
writeAndRecover(firstSmallClaim, largeClaim, secondSmallClaim);
+
+        final List<FlowFileRecord> flowFilesWithClaims = recovered.stream()
+                .filter(flowFile -> flowFile.getContentClaim() != null)
+                .toList();
+
+        assertFalse(flowFilesWithClaims.isEmpty());
+        for (final FlowFileRecord flowFile : flowFilesWithClaims) {
+            assertFalse(flowFile.getContentClaim().isTruncationCandidate(),
+                    "No claim should be a truncation candidate because the 
large claim is not the tail; claim offset=" + 
flowFile.getContentClaim().getOffset());
+        }
+    }
 }
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
 
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
index 814d3e81642..780a44f3699 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaim.java
@@ -29,6 +29,7 @@ public final class StandardContentClaim implements 
ContentClaim, Comparable<Cont
     private final ResourceClaim resourceClaim;
     private final long offset;
     private volatile long length;
+    private volatile boolean truncationCandidate = false;
 
     public StandardContentClaim(final ResourceClaim resourceClaim, final long 
offset) {
         this.resourceClaim = resourceClaim;
@@ -40,6 +41,15 @@ public final class StandardContentClaim implements 
ContentClaim, Comparable<Cont
         this.length = length;
     }
 
+    public void setTruncationCandidate(final boolean candidate) {
+        this.truncationCandidate = candidate;
+    }
+
+    @Override
+    public boolean isTruncationCandidate() {
+        return truncationCandidate;
+    }
+
     @Override
     public int hashCode() {
         final int prime = 31;
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
 
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
index 95687747487..1e483fc25f3 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/claim/StandardResourceClaimManager.java
@@ -32,6 +32,7 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
     private static final Logger logger = 
LoggerFactory.getLogger(StandardResourceClaimManager.class);
     private final ConcurrentMap<ResourceClaim, ClaimCount> claimantCounts = 
new ConcurrentHashMap<>();
     private final BlockingQueue<ResourceClaim> destructableClaims = new 
LinkedBlockingQueue<>(50000);
+    private final BlockingQueue<ContentClaim> truncatableClaims = new 
LinkedBlockingQueue<>(100000);
 
     @Override
     public ResourceClaim newResourceClaim(final String container, final String 
section, final String id, final boolean lossTolerant, final boolean writable) {
@@ -161,6 +162,30 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
         }
     }
 
+    @Override
+    public void markTruncatable(final ContentClaim contentClaim) {
+        if (contentClaim == null) {
+            return;
+        }
+
+        final ResourceClaim resourceClaim = contentClaim.getResourceClaim();
+        synchronized (resourceClaim) {
+            if (isDestructable(resourceClaim)) {
+                return;
+            }
+
+            logger.debug("Marking {} as truncatable", contentClaim);
+            try {
+                if (!truncatableClaims.offer(contentClaim, 1, 
TimeUnit.MINUTES)) {
+                    logger.info("Unable to mark {} as truncatable because 
maximum queue size [{}] reached", truncatableClaims.size(), contentClaim);
+                }
+            } catch (final InterruptedException ie) {
+                Thread.currentThread().interrupt();
+                logger.debug("Interrupted while marking {} as truncatable", 
contentClaim, ie);
+            }
+        }
+    }
+
     @Override
     public void drainDestructableClaims(final Collection<ResourceClaim> 
destination, final int maxElements) {
         final int drainedCount = destructableClaims.drainTo(destination, 
maxElements);
@@ -179,6 +204,12 @@ public class StandardResourceClaimManager implements 
ResourceClaimManager {
         }
     }
 
+    @Override
+    public void drainTruncatableClaims(final Collection<ContentClaim> 
destination, final int maxElements) {
+        final int drainedCount = truncatableClaims.drainTo(destination, 
maxElements);
+        logger.debug("Drained {} truncatable claims to {}", drainedCount, 
destination);
+    }
+
     @Override
     public void purge() {
         claimantCounts.clear();
diff --git 
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
 
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
index 7fb77b2739c..dd36bdd230a 100644
--- 
a/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
+++ 
b/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/test/java/org/apache/nifi/controller/repository/claim/TestStandardResourceClaimManager.java
@@ -21,12 +21,14 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestStandardResourceClaimManager {
 
@@ -56,4 +58,53 @@ public class TestStandardResourceClaimManager {
         manager.drainDestructableClaims(new ArrayList<>(), 1);
         assertSame(completedObject, future.get());
     }
+
+    @Test
+    public void testMarkTruncatableSkipsDestructableResourceClaim() {
+        final StandardResourceClaimManager manager = new 
StandardResourceClaimManager();
+
+        // Create a resource claim with claimant count 0 and mark it 
destructable
+        final ResourceClaim rc = manager.newResourceClaim("container", 
"section", "id1", false, false);
+        manager.markDestructable(rc);
+
+        // Create a content claim on that resource claim
+        final StandardContentClaim contentClaim = new StandardContentClaim(rc, 
0);
+        contentClaim.setLength(1024);
+        contentClaim.setTruncationCandidate(true);
+
+        // markTruncatable should skip this because the resource claim is 
already destructable
+        manager.markTruncatable(contentClaim);
+
+        // Drain truncatable claims - should be empty
+        final List<ContentClaim> truncated = new ArrayList<>();
+        manager.drainTruncatableClaims(truncated, 10);
+        assertTrue(truncated.isEmpty(), "Truncatable claims should be empty 
because the resource claim is destructable");
+    }
+
+    @Test
+    public void testMarkTruncatableAndDrainRespectsMaxElements() {
+        final StandardResourceClaimManager manager = new 
StandardResourceClaimManager();
+
+        // Create 5 truncatable claims, each on a distinct resource claim with 
a positive claimant count
+        for (int i = 0; i < 5; i++) {
+            final ResourceClaim rc = manager.newResourceClaim("container", 
"section", "id-" + i, false, false);
+            // Give each resource claim a positive claimant count so it's not 
destructable
+            manager.incrementClaimantCount(rc);
+
+            final StandardContentClaim cc = new StandardContentClaim(rc, 0);
+            cc.setLength(1024);
+            cc.setTruncationCandidate(true);
+            manager.markTruncatable(cc);
+        }
+
+        // Drain with maxElements=3
+        final List<ContentClaim> batch1 = new ArrayList<>();
+        manager.drainTruncatableClaims(batch1, 3);
+        assertEquals(3, batch1.size(), "First drain should return exactly 3 
claims");
+
+        // Drain again - should get remaining 2
+        final List<ContentClaim> batch2 = new ArrayList<>();
+        manager.drainTruncatableClaims(batch2, 10);
+        assertEquals(2, batch2.size(), "Second drain should return the 
remaining 2 claims");
+    }
 }
diff --git 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
index 08c9fd2eca7..4b24f3af045 100644
--- 
a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
+++ 
b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/ByteArrayContentRepository.java
@@ -260,6 +260,11 @@ public class ByteArrayContentRepository implements 
ContentRepository {
             return resourceClaim.getLength();
         }
 
+        @Override
+        public boolean isTruncationCandidate() {
+            return false;
+        }
+
         @Override
         public int compareTo(final ContentClaim o) {
             return resourceClaim.compareTo(o.getResourceClaim());
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java
new file mode 100644
index 00000000000..c00ba93c6ce
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/processors/tests/system/GenerateTruncatableFlowFiles.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.tests.system;
+
+import org.apache.nifi.annotation.configuration.DefaultSchedule;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+@DefaultSchedule(period = "10 mins")
+public class GenerateTruncatableFlowFiles extends AbstractProcessor {
+
+    static final PropertyDescriptor BATCH_COUNT = new 
PropertyDescriptor.Builder()
+        .name("Batch Count")
+        .description("""
+                The maximum number of batches to generate. Each batch produces 
10 FlowFiles (9 small + 1 large). \
+                Once this many batches have been generated, no more FlowFiles 
will be produced until the processor is stopped and restarted.""")
+        .required(true)
+        .defaultValue("10")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor SMALL_FILE_SIZE = new 
PropertyDescriptor.Builder()
+        .name("Small File Size")
+        .description("Size of each small FlowFile in bytes")
+        .required(true)
+        .defaultValue("1 KB")
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor LARGE_FILE_SIZE = new 
PropertyDescriptor.Builder()
+        .name("Large File Size")
+        .description("Size of each large FlowFile in bytes")
+        .required(true)
+        .defaultValue("10 MB")
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor SMALL_FILES_PER_BATCH = new 
PropertyDescriptor.Builder()
+        .name("Small Files Per Batch")
+        .description("Number of small FlowFiles to generate per batch")
+        .required(true)
+        .defaultValue("9")
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return List.of(BATCH_COUNT,
+            SMALL_FILE_SIZE,
+            LARGE_FILE_SIZE,
+            SMALL_FILES_PER_BATCH);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return Set.of(REL_SUCCESS);
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        final int batchCount = context.getProperty(BATCH_COUNT).asInteger();
+        final Random random = new Random();
+        final int smallFileSize = 
context.getProperty(SMALL_FILE_SIZE).asDataSize(DataUnit.B).intValue();
+        final int largeFileSize = 
context.getProperty(LARGE_FILE_SIZE).asDataSize(DataUnit.B).intValue();
+        final int smallFilesPerBatch = 
context.getProperty(SMALL_FILES_PER_BATCH).asInteger();
+
+        for (int batch = 0; batch < batchCount; batch++) {
+            // Generate small FlowFiles with priority = 10 (low priority, 
processed last by PriorityAttributePrioritizer)
+            for (int i = 0; i < smallFilesPerBatch; i++) {
+                createFlowFile(session, random, smallFileSize, "10");
+            }
+
+            // Generate one large FlowFile with priority = 1 (high priority, 
processed first by PriorityAttributePrioritizer)
+            createFlowFile(session, random, largeFileSize, "1");
+        }
+    }
+
+    private void createFlowFile(final ProcessSession session, final Random 
random, final int fileSize, final String priority) {
+        FlowFile flowFile = session.create();
+        flowFile = session.putAttribute(flowFile, "priority", priority);
+        final byte[] data = new byte[fileSize];
+        random.nextBytes(data);
+        flowFile = session.write(flowFile, out -> out.write(data));
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index a12f954cb84..d4a9b4c81cf 100644
--- 
a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -29,6 +29,7 @@ org.apache.nifi.processors.tests.system.FakeProcessor
 org.apache.nifi.processors.tests.system.FakeDynamicPropertiesProcessor
 org.apache.nifi.processors.tests.system.GenerateAndCountCallbacks
 org.apache.nifi.processors.tests.system.GenerateFlowFile
+org.apache.nifi.processors.tests.system.GenerateTruncatableFlowFiles
 org.apache.nifi.processors.tests.system.HoldInput
 org.apache.nifi.processors.tests.system.IngestFile
 org.apache.nifi.processors.tests.system.LoopFlowFile
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java
new file mode 100644
index 00000000000..012739ca872
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationAfterRestartIT.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.repositories;
+
+import org.apache.nifi.tests.system.NiFiInstance;
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * System test that verifies the truncation feature works correctly after a 
NiFi restart.
+ * <p>
+ * During the first run, NiFi is configured with very conservative truncation 
settings (99% archive
+ * usage threshold), so truncation never activates. FlowFiles are generated 
but not deleted.
+ * </p>
+ * <p>
+ * NiFi is then stopped, reconfigured with aggressive truncation settings (1% 
archive usage threshold),
+ * and restarted. On recovery, {@code 
WriteAheadFlowFileRepository.restoreFlowFiles()} re-derives
+ * truncation candidates by analyzing the recovered FlowFiles' ContentClaims. 
After restart, the large
+ * FlowFiles are deleted, and the test verifies that the content repository 
files are truncated on disk.
+ * </p>
+ */
+public class ContentClaimTruncationAfterRestartIT extends NiFiSystemIT {
+
+    @Override
+    protected Map<String, String> getNifiPropertiesOverrides() {
+        // Phase 1: Conservative settings — truncation should NOT occur
+        final Map<String, String> overrides = new HashMap<>();
+        overrides.put("nifi.flowfile.repository.checkpoint.interval", "1 sec");
+        overrides.put("nifi.content.claim.max.appendable.size", "50 KB");
+        // Very high archive threshold means no disk pressure, so truncation 
never activates
+        overrides.put("nifi.content.repository.archive.max.usage.percentage", 
"99%");
+        overrides.put("nifi.content.repository.archive.cleanup.frequency", "1 
sec");
+        return overrides;
+    }
+
+    @Override
+    protected boolean isAllowFactoryReuse() {
+        return false;
+    }
+
+    @Test
+    public void testTruncationOccursAfterRestartWithRecoveredCandidates() 
throws NiFiClientException, IOException, InterruptedException {
+        // === Phase 1: Generate FlowFiles with conservative settings (no 
truncation) ===
+
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateTruncatableFlowFiles");
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+
+        final Map<String, String> generateProps = Map.of(
+            "Batch Count", "10",
+            "Small File Size", "1 KB",
+            "Large File Size", "10 MB",
+            "Small Files Per Batch", "9"
+        );
+        getClientUtil().updateProcessorProperties(generate, generateProps);
+
+        ConnectionEntity connection = 
getClientUtil().createConnection(generate, terminate, "success");
+        connection = getClientUtil().updateConnectionPrioritizer(connection, 
"PriorityAttributePrioritizer");
+        connection = getClientUtil().updateConnectionBackpressure(connection, 
10000, 100L * 1024 * 1024);
+
+        // Generate all 100 FlowFiles (90 small @ 1 KB + 10 large @ 10 MB)
+        getClientUtil().startProcessor(generate);
+        waitForQueueCount(connection.getId(), 100);
+        getClientUtil().stopProcessor(generate);
+        getClientUtil().waitForStoppedProcessor(generate.getId());
+
+        // Verify the content repository is large — the 10 MB FlowFiles are on 
disk
+        final File contentRepoDir = new 
File(getNiFiInstance().getInstanceDirectory(), "content_repository");
+        final long thresholdBytes = 1024 * 1024; // 1 MB
+        final long sizeBeforeRestart = getContentRepoSize(contentRepoDir);
+        assertTrue(sizeBeforeRestart > thresholdBytes,
+            "Content repository should be large before restart, but was " + 
sizeBeforeRestart + " bytes");
+
+        // === Phase 2: Stop NiFi, reconfigure for aggressive truncation, and 
restart ===
+
+        final NiFiInstance nifiInstance = getNiFiInstance();
+        nifiInstance.stop();
+
+        // Switch archive threshold to 1% so truncation activates under disk 
pressure
+        nifiInstance.setProperties(Map.of(
+            "nifi.content.repository.archive.max.usage.percentage", "1%"
+        ));
+
+        nifiInstance.start(true);
+
+        // After restart, WriteAheadFlowFileRepository.restoreFlowFiles() 
should have re-derived
+        // that the 10 large tail claims are truncation candidates.
+
+        // Run TerminateFlowFile 10 times. Due to 
PriorityAttributePrioritizer, the 10 large
+        // FlowFiles (priority=1) are dequeued first.
+        for (int i = 0; i < 10; i++) {
+            final ProcessorEntity terminateAfterRestart = 
getNifiClient().getProcessorClient().getProcessor(terminate.getId());
+            
getNifiClient().getProcessorClient().runProcessorOnce(terminateAfterRestart);
+            
getClientUtil().waitForStoppedProcessor(terminateAfterRestart.getId());
+        }
+
+        waitForQueueCount(connection.getId(), 90);
+
+        // Wait for the content repository files to be truncated.
+        // Before truncation: ~10 files of ~10 MB each = ~100 MB total.
+        // After truncation: ~10 files of ~9 KB each = ~90 KB total.
+        waitFor(() -> {
+            try {
+                return getContentRepoSize(contentRepoDir) < thresholdBytes;
+            } catch (final Exception e) {
+                return false;
+            }
+        });
+
+        final long finalSize = getContentRepoSize(contentRepoDir);
+        assertTrue(finalSize < thresholdBytes,
+            "Content repository total size should be below " + thresholdBytes 
+ " bytes after truncation, but was " + finalSize);
+    }
+
+    private long getContentRepoSize(final File dir) {
+        if (dir == null || !dir.exists()) {
+            return 0;
+        }
+
+        final File[] children = dir.listFiles();
+        if (children == null) {
+            return 0L;
+        }
+
+        long total = 0;
+        for (final File child : children) {
+            if (child.isDirectory()) {
+                if (child.getName().equals("archive")) {
+                    continue; // Skip archive directories
+                }
+
+                total += getContentRepoSize(child);
+            } else {
+                total += child.length();
+            }
+        }
+
+        return total;
+    }
+}
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java
new file mode 100644
index 00000000000..c8ae97087ff
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentClaimTruncationIT.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.tests.system.repositories;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.client.NiFiClientException;
+import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ProcessorEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * System test that verifies the end-to-end truncation feature. It generates 
FlowFiles with a pattern
+ * of 9 small (1 KB) + 1 large (10 MB) per batch, removes only the large 
FlowFiles via priority-based
+ * ordering, and then verifies that the content repository files are truncated 
on disk.
+ */
+public class ContentClaimTruncationIT extends NiFiSystemIT {
+
+    @Override
+    protected Map<String, String> getNifiPropertiesOverrides() {
+        final Map<String, String> overrides = new HashMap<>();
+        // Use a short checkpoint interval so truncatable claims are flushed 
to the ResourceClaimManager promptly
+        overrides.put("nifi.flowfile.repository.checkpoint.interval", "1 sec");
+        overrides.put("nifi.content.repository.archive.cleanup.frequency", "1 
sec");
+        // Explicitly set the max appendable claim size (same as system test 
default, but explicit for clarity)
+        overrides.put("nifi.content.claim.max.appendable.size", "50 KB");
+        // Set archive threshold extremely low so that truncation occurs 
quickly
+        overrides.put("nifi.content.repository.archive.max.usage.percentage", 
"1%");
+        return overrides;
+    }
+
+    @Override
+    protected boolean isAllowFactoryReuse() {
+        // Don't reuse the NiFi instance since we override checkpoint interval
+        return false;
+    }
+
+    @Test
+    public void testLargeFlowFileTruncation() throws NiFiClientException, 
IOException, InterruptedException {
+        // Create the processors
+        final ProcessorEntity generate = 
getClientUtil().createProcessor("GenerateTruncatableFlowFiles");
+        final ProcessorEntity terminate = 
getClientUtil().createProcessor("TerminateFlowFile");
+
+        // Configure GenerateTruncatableFlowFiles with 10 batches (100 
FlowFiles total)
+        final Map<String, String> generateProps = Map.of(
+            "Batch Count", "10",
+            "Small File Size", "1 KB",
+            "Large File Size", "10 MB",
+            "Small Files Per Batch", "9"
+        );
+        getClientUtil().updateProcessorProperties(generate, generateProps);
+
+        // Create connection with PriorityAttributePrioritizer and 100 MB 
backpressure
+        ConnectionEntity connection = 
getClientUtil().createConnection(generate, terminate, "success");
+        connection = getClientUtil().updateConnectionPrioritizer(connection, 
"PriorityAttributePrioritizer");
+        connection = getClientUtil().updateConnectionBackpressure(connection, 
10000, 100L * 1024 * 1024);
+
+        // Start the generator and wait for 100 FlowFiles to be queued
+        getClientUtil().startProcessor(generate);
+        waitForQueueCount(connection.getId(), 100);
+
+        // Stop the generator
+        getClientUtil().stopProcessor(generate);
+        getClientUtil().waitForStoppedProcessor(generate.getId());
+
+        // Run TerminateFlowFile 10 times. Due to PriorityAttributePrioritizer,
+        // the 10 large FlowFiles (priority=1) will be dequeued first.
+        for (int i = 0; i < 10; i++) {
+            getNifiClient().getProcessorClient().runProcessorOnce(terminate);
+            getClientUtil().waitForStoppedProcessor(terminate.getId());
+        }
+
+        // Wait for 90 FlowFiles remaining (the 10 large ones have been 
removed)
+        waitForQueueCount(connection.getId(), 90);
+
+        // Wait for the content repository files to be truncated.
+        // Before truncation: ~10 files of ~10 MB each = ~100 MB total.
+        // After truncation: ~10 files of ~9 KB each = ~90 KB total.
+        // We set a generous threshold of 1 MB.
+        final File contentRepoDir = new 
File(getNiFiInstance().getInstanceDirectory(), "content_repository");
+        final long thresholdBytes = 1024 * 1024; // 1 MB
+
+        waitFor(() -> {
+            try {
+                final long totalSize = 
getContentRepoSize(contentRepoDir.toPath());
+                return totalSize < thresholdBytes;
+            } catch (final IOException e) {
+                return false;
+            }
+        });
+
+        // Final assertion
+        final long finalSize = getContentRepoSize(contentRepoDir.toPath());
+        assertTrue(finalSize < thresholdBytes,
+                "Content repository total size should be below " + 
thresholdBytes + " bytes after truncation, but was " + finalSize);
+    }
+
+    /**
+     * Walks the content repository directory (excluding any "archive" 
subdirectories)
+     * and returns the total size of all regular files.
+     */
+    private long getContentRepoSize(final Path contentRepoPath) throws 
IOException {
+        if (!Files.exists(contentRepoPath)) {
+            return 0;
+        }
+
+        final AtomicLong totalSize = new AtomicLong(0);
+        Files.walkFileTree(contentRepoPath, new SimpleFileVisitor<>() {
+            @Override
+            public FileVisitResult preVisitDirectory(final Path dir, final 
BasicFileAttributes attrs) {
+                // Skip archive directories
+                if (dir.getFileName() != null && 
"archive".equals(dir.getFileName().toString())) {
+                    return FileVisitResult.SKIP_SUBTREE;
+                }
+                return FileVisitResult.CONTINUE;
+            }
+
+            @Override
+            public FileVisitResult visitFile(final Path file, final 
BasicFileAttributes attrs) {
+                totalSize.addAndGet(attrs.size());
+                return FileVisitResult.CONTINUE;
+            }
+        });
+
+        return totalSize.get();
+    }
+}

Reply via email to